128 lines
4.2 KiB
Python
128 lines
4.2 KiB
Python
import time
|
|
import gevent
|
|
import logging
|
|
|
|
log = logging.getLogger("RateLimit")
|
|
|
|
called_db = {} # Holds events last call time
|
|
queue_db = {} # Commands queued to run
|
|
|
|
# Register event as called
|
|
# Return: None
|
|
|
|
|
|
def called(event, penalty=0):
|
|
called_db[event] = time.time() + penalty
|
|
|
|
|
|
# Check if calling event is allowed
|
|
# Return: True if allowed False if not
|
|
def isAllowed(event, allowed_again=10):
|
|
last_called = called_db.get(event)
|
|
if not last_called: # Its not called before
|
|
return True
|
|
elif time.time() - last_called >= allowed_again:
|
|
del called_db[event] # Delete last call time to save memory
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def delayLeft(event, allowed_again=10):
|
|
last_called = called_db.get(event)
|
|
if not last_called: # Its not called before
|
|
return 0
|
|
else:
|
|
return allowed_again - (time.time() - last_called)
|
|
|
|
def callQueue(event):
|
|
func, args, kwargs, thread = queue_db[event]
|
|
log.debug("Calling: %s" % event)
|
|
called(event)
|
|
del queue_db[event]
|
|
return func(*args, **kwargs)
|
|
|
|
|
|
# Rate limit and delay function call if necessary
|
|
# If the function called again within the rate limit interval then previous queued call will be dropped
|
|
# Return: Immediately gevent thread
|
|
def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
|
|
if isAllowed(event, allowed_again): # Not called recently, call it now
|
|
called(event)
|
|
# print "Calling now"
|
|
return gevent.spawn(func, *args, **kwargs)
|
|
else: # Called recently, schedule it for later
|
|
time_left = allowed_again - max(0, time.time() - called_db[event])
|
|
log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
|
|
if not queue_db.get(event): # Function call not queued yet
|
|
thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later
|
|
queue_db[event] = (func, args, kwargs, thread)
|
|
return thread
|
|
else: # Function call already queued, just update the parameters
|
|
thread = queue_db[event][3]
|
|
queue_db[event] = (func, args, kwargs, thread)
|
|
return thread
|
|
|
|
|
|
# Rate limit and delay function call if needed
|
|
# Return: Wait for execution/delay then return value
|
|
def call(event, allowed_again=10, func=None, *args, **kwargs):
|
|
if isAllowed(event): # Not called recently, call it now
|
|
called(event)
|
|
# print "Calling now", allowed_again
|
|
return func(*args, **kwargs)
|
|
|
|
else: # Called recently, schedule it for later
|
|
time_left = max(0, allowed_again - (time.time() - called_db[event]))
|
|
# print "Time left: %s" % time_left, args, kwargs
|
|
log.debug("Calling sync (%.2fs left): %s" % (time_left, event))
|
|
called(event, time_left)
|
|
time.sleep(time_left)
|
|
back = func(*args, **kwargs)
|
|
called(event)
|
|
return back
|
|
|
|
|
|
# Cleanup expired events every 3 minutes
|
|
def rateLimitCleanup():
|
|
while 1:
|
|
expired = time.time() - 60 * 2 # Cleanup if older than 2 minutes
|
|
for event in list(called_db.keys()):
|
|
if called_db[event] < expired:
|
|
del called_db[event]
|
|
time.sleep(60 * 3) # Every 3 minutes
|
|
gevent.spawn(rateLimitCleanup)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from gevent import monkey
|
|
monkey.patch_all()
|
|
import random
|
|
|
|
def publish(inner_path):
|
|
print("Publishing %s..." % inner_path)
|
|
return 1
|
|
|
|
def cb(thread):
|
|
print("Value:", thread.value)
|
|
|
|
print("Testing async spam requests rate limit to 1/sec...")
|
|
for i in range(3000):
|
|
thread = callAsync("publish content.json", 1, publish, "content.json %s" % i)
|
|
time.sleep(float(random.randint(1, 20)) / 100000)
|
|
print(thread.link(cb))
|
|
print("Done")
|
|
|
|
time.sleep(2)
|
|
|
|
print("Testing sync spam requests rate limit to 1/sec...")
|
|
for i in range(5):
|
|
call("publish data.json", 1, publish, "data.json %s" % i)
|
|
time.sleep(float(random.randint(1, 100)) / 100)
|
|
print("Done")
|
|
|
|
print("Testing cleanup")
|
|
thread = callAsync("publish content.json single", 1, publish, "content.json single")
|
|
print("Needs to cleanup:", called_db, queue_db)
|
|
print("Waiting 3min for cleanup process...")
|
|
time.sleep(60 * 3)
|
|
print("Cleaned up:", called_db, queue_db)
|