Rev427, Ignore repr from coverage, Add RateLimit call penalty to, Event test, Noparallel test, RateLimit test, Remove falling Pypy test

This commit is contained in:
HelloZeroNet 2015-09-21 02:36:23 +02:00
parent 8f7df0f7fb
commit 47dbdc0850
9 changed files with 342 additions and 133 deletions

View file

@ -2,10 +2,6 @@ language: python
cache: pip
python:
- 2.7
- pypy
matrix:
allow_failures:
- pypy
install:
- pip install -r requirements.txt
before_script:

View file

@ -8,7 +8,7 @@ class Config(object):
def __init__(self, argv):
self.version = "0.3.2"
self.rev = 426
self.rev = 427
self.argv = argv
self.action = None
self.createParser()

65
src/Test/TestEvent.py Normal file
View file

@ -0,0 +1,65 @@
import util
class TestClass(object):
def __init__(self):
self.called = []
self.onChanged = util.Event()
def increment(self, title):
self.called.append(title)
class TestEvent:
def testEvent(self):
test_obj = TestClass()
test_obj.onChanged.append(lambda: test_obj.increment("Called #1"))
test_obj.onChanged.append(lambda: test_obj.increment("Called #2"))
test_obj.onChanged.once(lambda: test_obj.increment("Once"))
assert test_obj.called == []
test_obj.onChanged()
assert test_obj.called == ["Called #1", "Called #2", "Once"]
test_obj.onChanged()
test_obj.onChanged()
assert test_obj.called == ["Called #1", "Called #2", "Once", "Called #1", "Called #2", "Called #1", "Called #2"]
def testOnce(self):
test_obj = TestClass()
test_obj.onChanged.once(lambda: test_obj.increment("Once test #1"))
# It should be called only once
assert test_obj.called == []
test_obj.onChanged()
assert test_obj.called == ["Once test #1"]
test_obj.onChanged()
test_obj.onChanged()
assert test_obj.called == ["Once test #1"]
def testOnceMultiple(self):
test_obj = TestClass()
# Allow queue more than once
test_obj.onChanged.once(lambda: test_obj.increment("Once test #1"))
test_obj.onChanged.once(lambda: test_obj.increment("Once test #2"))
test_obj.onChanged.once(lambda: test_obj.increment("Once test #3"))
assert test_obj.called == []
test_obj.onChanged()
assert test_obj.called == ["Once test #1", "Once test #2", "Once test #3"]
test_obj.onChanged()
test_obj.onChanged()
assert test_obj.called == ["Once test #1", "Once test #2", "Once test #3"]
def testOnceNamed(self):
test_obj = TestClass()
# Dont store more that one from same type
test_obj.onChanged.once(lambda: test_obj.increment("Once test #1/1"), "type 1")
test_obj.onChanged.once(lambda: test_obj.increment("Once test #1/2"), "type 1")
test_obj.onChanged.once(lambda: test_obj.increment("Once test #2"), "type 2")
assert test_obj.called == []
test_obj.onChanged()
assert test_obj.called == ["Once test #1/1", "Once test #2"]
test_obj.onChanged()
test_obj.onChanged()
assert test_obj.called == ["Once test #1/1", "Once test #2"]

View file

@ -0,0 +1,62 @@
import time
import gevent
from gevent import monkey
monkey.patch_all()
import util
class TestClass(object):
def __init__(self):
self.counted = 0
@util.Noparallel()
def countBlocking(self, num=5):
for i in range(1, num+1):
time.sleep(0.01)
self.counted += 1
return "counted:%s" % i
@util.Noparallel(blocking=False)
def countNoblocking(self, num=5):
for i in range(1, num+1):
time.sleep(0.01)
self.counted += 1
return "counted:%s" % i
class TestNoparallel:
def testBlocking(self):
obj1 = TestClass()
obj2 = TestClass()
# Dont allow to call again until its running and wait until its running
threads = [
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj2.countBlocking)
]
assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes
gevent.joinall(threads)
assert [thread.value for thread in threads] == ["counted:5","counted:5","counted:5","counted:5"] # Check the return value for every call
obj2.countBlocking() # Allow to call again as obj2.countBlocking finished
assert obj1.counted == 5
assert obj2.counted == 10
def testNoblocking(self):
obj1 = TestClass()
obj2 = TestClass()
thread1 = obj1.countNoblocking()
thread2 = obj1.countNoblocking() # Ignored
assert obj1.counted == 0
time.sleep(0.1)
assert thread1.value == "counted:5"
assert thread2.value == "counted:5"
assert obj1.counted == 5
obj1.countNoblocking().join() # Allow again and wait until finishes
assert obj1.counted == 10

96
src/Test/TestRateLimit.py Normal file
View file

@ -0,0 +1,96 @@
import time
import gevent
from gevent import monkey
monkey.patch_all()
from util import RateLimit
# Time is around limit +/- 0.01 sec
def around(t, limit):
return t >= limit - 0.01 and t <= limit + 0.01
class TestClass(object):
def __init__(self):
self.counted = 0
self.last_called = None
def count(self, back="counted"):
self.counted += 1
self.last_called = back
return back
class TestRateLimit:
def testCall(self):
obj1 = TestClass()
obj2 = TestClass()
s = time.time()
assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
assert around(time.time() - s, 0.0) # First allow to call instantly
assert obj1.counted == 1
# Call again
assert not RateLimit.isAllowed("counting", 0.1)
assert RateLimit.isAllowed("something else", 0.1)
assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
assert around(time.time() - s, 0.1) # Delays second call within interval
assert obj1.counted == 2
# Call 3 times async
s = time.time()
assert obj2.counted == 0
threads = [
gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant
gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay
gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay
]
gevent.joinall(threads)
assert [thread.value for thread in threads] == ["counted", "counted", "counted"]
assert around(time.time() - s, 0.2)
# No queue = instant again
s = time.time()
assert RateLimit.isAllowed("counting", 0.1)
assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted"
assert around(time.time() - s, 0.0)
assert obj2.counted == 4
def testCallAsync(self):
obj1 = TestClass()
obj2 = TestClass()
s = time.time()
RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #1").join()
assert obj1.counted == 1 # First instant
assert around(time.time() - s, 0.0)
# After that the calls delayed
s = time.time()
t1 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #2") # Dumped by the next call
time.sleep(0.03)
t2 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #3") # Dumped by the next call
time.sleep(0.03)
t3 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #4") # Will be called
assert obj1.counted == 1 # Delay still in progress: Not called yet
t3.join()
assert t3.value == "call #4"
assert around(time.time() - s, 0.1)
# Only the last one called
assert obj1.counted == 2
assert obj1.last_called == "call #4"
# Allowed again instantly
assert RateLimit.isAllowed("counting async", 0.1)
s = time.time()
RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #5").join()
assert obj1.counted == 3
assert around(time.time() - s, 0.0)
assert not RateLimit.isAllowed("counting async", 0.1)
time.sleep(0.11)
assert RateLimit.isAllowed("counting async", 0.1)

View file

@ -1,5 +1,6 @@
[run]
branch = True
concurrency = gevent
omit =
src/lib/*
src/Test/*
@ -7,11 +8,8 @@ omit =
[report]
exclude_lines =
pragma: no cover
if __name__ == .__main__.:
if config.debug:
if config.debug_socket:
if self.logging:
def __repr__

View file

@ -25,6 +25,7 @@ class Event(list):
return self
if __name__ == "__main__":
def testBenchmark():
def say(pre, text):
print "%s Say: %s" % (pre, text)
@ -37,7 +38,6 @@ def testBenchmark():
print "Created 1000 once in %.3fs" % (time.time() - s)
on_changed("#1")
def testUsage():
def say(pre, text):
print "%s Say: %s" % (pre, text)
@ -52,6 +52,4 @@ def testUsage():
on_changed("#2")
on_changed("#3")
if __name__ == "__main__":
testBenchmark()

View file

@ -43,6 +43,7 @@ class Noparallel(object): # Only allow function running once in same time
del(self.threads[key])
if __name__ == "__main__":
class Test():
@Noparallel()
@ -52,7 +53,6 @@ class Test():
time.sleep(1)
return "%s return:%s" % (self, i)
class TestNoblock():
@Noparallel(blocking=False)
@ -62,7 +62,6 @@ class TestNoblock():
time.sleep(1)
return "%s return:%s" % (self, i)
def testBlocking():
test = Test()
test2 = Test()
@ -89,7 +88,6 @@ def testBlocking():
print thread1.value, thread2.value, thread3.value, thread4.value
print "Done."
def testNoblocking():
test = TestNoblock()
test2 = TestNoblock()
@ -114,7 +112,6 @@ def testNoblocking():
print thread1.value, thread2.value, thread3.value, thread4.value
print "Done."
def testBenchmark():
import time
@ -132,9 +129,6 @@ def testBenchmark():
print "Created in %.3fs" % (time.time() - s)
printThreadNum()
time.sleep(5)
if __name__ == "__main__":
from gevent import monkey
monkey.patch_all()

View file

@ -11,8 +11,8 @@ queue_db = {} # Commands queued to run
# Return: None
def called(event):
called_db[event] = time.time()
def called(event, penalty=0):
called_db[event] = time.time() + penalty
# Check if calling event is allowed
@ -62,15 +62,15 @@ def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
def call(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event): # Not called recently, call it now
called(event)
# print "Calling now"
# print "Calling now", allowed_again
return func(*args, **kwargs)
else: # Called recently, schedule it for later
time_left = max(0, allowed_again - (time.time() - called_db[event]))
# print "Time left: %s" % time_left, args, kwargs
log.debug("Calling sync (%.2fs left): %s" % (time_left, event))
called(event, time_left)
time.sleep(time_left)
called(event)
back = func(*args, **kwargs)
if event in called_db:
del called_db[event]