From 47dbdc08504b9f9d939c0bbfd76d0c138cb5a11a Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Mon, 21 Sep 2015 02:36:23 +0200 Subject: [PATCH] Rev427, Ignore repr from coverage, Add RateLimit call penalty to, Event test, Noparallel test, RateLimit test, Remove falling Pypy test --- .travis.yml | 4 - src/Config.py | 2 +- src/Test/TestEvent.py | 65 ++++++++++++++ src/Test/TestNoparallel.py | 62 +++++++++++++ src/Test/TestRateLimit.py | 96 ++++++++++++++++++++ src/Test/coverage.ini | 8 +- src/util/Event.py | 54 ++++++------ src/util/Noparallel.py | 176 ++++++++++++++++++------------------- src/util/RateLimit.py | 8 +- 9 files changed, 342 insertions(+), 133 deletions(-) create mode 100644 src/Test/TestEvent.py create mode 100644 src/Test/TestNoparallel.py create mode 100644 src/Test/TestRateLimit.py diff --git a/.travis.yml b/.travis.yml index b8c21809..0a7c4b78 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,6 @@ language: python cache: pip python: - 2.7 - - pypy -matrix: - allow_failures: - - pypy install: - pip install -r requirements.txt before_script: diff --git a/src/Config.py b/src/Config.py index 1216089d..8b53feea 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.2" - self.rev = 426 + self.rev = 427 self.argv = argv self.action = None self.createParser() diff --git a/src/Test/TestEvent.py b/src/Test/TestEvent.py new file mode 100644 index 00000000..b05bca85 --- /dev/null +++ b/src/Test/TestEvent.py @@ -0,0 +1,65 @@ +import util + + +class TestClass(object): + def __init__(self): + self.called = [] + self.onChanged = util.Event() + + def increment(self, title): + self.called.append(title) + + +class TestEvent: + def testEvent(self): + test_obj = TestClass() + test_obj.onChanged.append(lambda: test_obj.increment("Called #1")) + test_obj.onChanged.append(lambda: test_obj.increment("Called #2")) + test_obj.onChanged.once(lambda: test_obj.increment("Once")) + + assert test_obj.called == [] + test_obj.onChanged() + assert test_obj.called == ["Called #1", "Called #2", "Once"] + test_obj.onChanged() + test_obj.onChanged() + assert test_obj.called == ["Called #1", "Called #2", "Once", "Called #1", "Called #2", "Called #1", "Called #2"] + + def testOnce(self): + test_obj = TestClass() + test_obj.onChanged.once(lambda: test_obj.increment("Once test #1")) + + # It should be called only once + assert test_obj.called == [] + test_obj.onChanged() + assert test_obj.called == ["Once test #1"] + test_obj.onChanged() + test_obj.onChanged() + assert test_obj.called == ["Once test #1"] + + def testOnceMultiple(self): + test_obj = TestClass() + # Allow queue more than once + test_obj.onChanged.once(lambda: test_obj.increment("Once test #1")) + test_obj.onChanged.once(lambda: test_obj.increment("Once test #2")) + test_obj.onChanged.once(lambda: test_obj.increment("Once test #3")) + + assert test_obj.called == [] + test_obj.onChanged() + assert test_obj.called == ["Once test #1", "Once test #2", "Once test #3"] + test_obj.onChanged() + test_obj.onChanged() + assert test_obj.called == ["Once test #1", "Once test #2", "Once test #3"] + + def testOnceNamed(self): + test_obj = TestClass() + # Dont store more that one from same type + test_obj.onChanged.once(lambda: test_obj.increment("Once test #1/1"), "type 1") + test_obj.onChanged.once(lambda: test_obj.increment("Once test #1/2"), "type 1") + test_obj.onChanged.once(lambda: test_obj.increment("Once test #2"), "type 2") + + assert test_obj.called == [] + test_obj.onChanged() + assert test_obj.called == ["Once test #1/1", "Once test #2"] + test_obj.onChanged() + test_obj.onChanged() + assert test_obj.called == ["Once test #1/1", "Once test #2"] diff --git a/src/Test/TestNoparallel.py b/src/Test/TestNoparallel.py new file mode 100644 index 00000000..71fcc3bd --- /dev/null +++ b/src/Test/TestNoparallel.py @@ -0,0 +1,62 @@ +import time + +import gevent +from gevent import monkey +monkey.patch_all() + +import util + +class TestClass(object): + def __init__(self): + self.counted = 0 + + @util.Noparallel() + def countBlocking(self, num=5): + for i in range(1, num+1): + time.sleep(0.01) + self.counted += 1 + return "counted:%s" % i + + @util.Noparallel(blocking=False) + def countNoblocking(self, num=5): + for i in range(1, num+1): + time.sleep(0.01) + self.counted += 1 + return "counted:%s" % i + + +class TestNoparallel: + def testBlocking(self): + obj1 = TestClass() + obj2 = TestClass() + + # Dont allow to call again until its running and wait until its running + threads = [ + gevent.spawn(obj1.countBlocking), + gevent.spawn(obj1.countBlocking), + gevent.spawn(obj1.countBlocking), + gevent.spawn(obj2.countBlocking) + ] + assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes + gevent.joinall(threads) + assert [thread.value for thread in threads] == ["counted:5","counted:5","counted:5","counted:5"] # Check the return value for every call + obj2.countBlocking() # Allow to call again as obj2.countBlocking finished + + assert obj1.counted == 5 + assert obj2.counted == 10 + + def testNoblocking(self): + obj1 = TestClass() + obj2 = TestClass() + + thread1 = obj1.countNoblocking() + thread2 = obj1.countNoblocking() # Ignored + + assert obj1.counted == 0 + time.sleep(0.1) + assert thread1.value == "counted:5" + assert thread2.value == "counted:5" + assert obj1.counted == 5 + + obj1.countNoblocking().join() # Allow again and wait until finishes + assert obj1.counted == 10 diff --git a/src/Test/TestRateLimit.py b/src/Test/TestRateLimit.py new file mode 100644 index 00000000..3ff779ad --- /dev/null +++ b/src/Test/TestRateLimit.py @@ -0,0 +1,96 @@ +import time + +import gevent +from gevent import monkey +monkey.patch_all() + +from util import RateLimit + + +# Time is around limit +/- 0.01 sec +def around(t, limit): + return t >= limit - 0.01 and t <= limit + 0.01 + + +class TestClass(object): + def __init__(self): + self.counted = 0 + self.last_called = None + + def count(self, back="counted"): + self.counted += 1 + self.last_called = back + return back + + +class TestRateLimit: + def testCall(self): + obj1 = TestClass() + obj2 = TestClass() + + s = time.time() + assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted" + assert around(time.time() - s, 0.0) # First allow to call instantly + assert obj1.counted == 1 + + # Call again + assert not RateLimit.isAllowed("counting", 0.1) + assert RateLimit.isAllowed("something else", 0.1) + assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted" + assert around(time.time() - s, 0.1) # Delays second call within interval + assert obj1.counted == 2 + + # Call 3 times async + s = time.time() + assert obj2.counted == 0 + threads = [ + gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant + gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay + gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay + ] + gevent.joinall(threads) + assert [thread.value for thread in threads] == ["counted", "counted", "counted"] + assert around(time.time() - s, 0.2) + + # No queue = instant again + s = time.time() + assert RateLimit.isAllowed("counting", 0.1) + assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted" + assert around(time.time() - s, 0.0) + + assert obj2.counted == 4 + + def testCallAsync(self): + obj1 = TestClass() + obj2 = TestClass() + + s = time.time() + RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #1").join() + assert obj1.counted == 1 # First instant + assert around(time.time() - s, 0.0) + + # After that the calls delayed + s = time.time() + t1 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #2") # Dumped by the next call + time.sleep(0.03) + t2 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #3") # Dumped by the next call + time.sleep(0.03) + t3 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #4") # Will be called + assert obj1.counted == 1 # Delay still in progress: Not called yet + t3.join() + assert t3.value == "call #4" + assert around(time.time() - s, 0.1) + + # Only the last one called + assert obj1.counted == 2 + assert obj1.last_called == "call #4" + + # Allowed again instantly + assert RateLimit.isAllowed("counting async", 0.1) + s = time.time() + RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #5").join() + assert obj1.counted == 3 + assert around(time.time() - s, 0.0) + assert not RateLimit.isAllowed("counting async", 0.1) + time.sleep(0.11) + assert RateLimit.isAllowed("counting async", 0.1) diff --git a/src/Test/coverage.ini b/src/Test/coverage.ini index 28732e91..ec34d0fc 100644 --- a/src/Test/coverage.ini +++ b/src/Test/coverage.ini @@ -1,5 +1,6 @@ [run] branch = True +concurrency = gevent omit = src/lib/* src/Test/* @@ -7,11 +8,8 @@ omit = [report] exclude_lines = pragma: no cover - if __name__ == .__main__.: - if config.debug: - if config.debug_socket: - - if self.logging: \ No newline at end of file + if self.logging: + def __repr__ diff --git a/src/util/Event.py b/src/util/Event.py index 44c9837e..b9614795 100644 --- a/src/util/Event.py +++ b/src/util/Event.py @@ -25,33 +25,31 @@ class Event(list): return self -def testBenchmark(): - def say(pre, text): - print "%s Say: %s" % (pre, text) - - import time - s = time.time() - on_changed = Event() - for i in range(1000): - on_changed.once(lambda pre: say(pre, "once"), "once") - print "Created 1000 once in %.3fs" % (time.time() - s) - on_changed("#1") - - -def testUsage(): - def say(pre, text): - print "%s Say: %s" % (pre, text) - - on_changed = Event() - on_changed.once(lambda pre: say(pre, "once")) - on_changed.once(lambda pre: say(pre, "once")) - on_changed.once(lambda pre: say(pre, "namedonce"), "namedonce") - on_changed.once(lambda pre: say(pre, "namedonce"), "namedonce") - on_changed.append(lambda pre: say(pre, "always")) - on_changed("#1") - on_changed("#2") - on_changed("#3") - - if __name__ == "__main__": + def testBenchmark(): + def say(pre, text): + print "%s Say: %s" % (pre, text) + + import time + s = time.time() + on_changed = Event() + for i in range(1000): + on_changed.once(lambda pre: say(pre, "once"), "once") + print "Created 1000 once in %.3fs" % (time.time() - s) + on_changed("#1") + + def testUsage(): + def say(pre, text): + print "%s Say: %s" % (pre, text) + + on_changed = Event() + on_changed.once(lambda pre: say(pre, "once")) + on_changed.once(lambda pre: say(pre, "once")) + on_changed.once(lambda pre: say(pre, "namedonce"), "namedonce") + on_changed.once(lambda pre: say(pre, "namedonce"), "namedonce") + on_changed.append(lambda pre: say(pre, "always")) + on_changed("#1") + on_changed("#2") + on_changed("#3") + testBenchmark() diff --git a/src/util/Noparallel.py b/src/util/Noparallel.py index a4862953..49adddbb 100644 --- a/src/util/Noparallel.py +++ b/src/util/Noparallel.py @@ -43,98 +43,92 @@ class Noparallel(object): # Only allow function running once in same time del(self.threads[key]) -class Test(): - - @Noparallel() - def count(self, num=5): - for i in range(num): - print self, i - time.sleep(1) - return "%s return:%s" % (self, i) - - -class TestNoblock(): - - @Noparallel(blocking=False) - def count(self, num=5): - for i in range(num): - print self, i - time.sleep(1) - return "%s return:%s" % (self, i) - - -def testBlocking(): - test = Test() - test2 = Test() - print "Counting..." - print "Creating class1/thread1" - thread1 = gevent.spawn(test.count) - print "Creating class1/thread2 (ignored)" - thread2 = gevent.spawn(test.count) - print "Creating class2/thread3" - thread3 = gevent.spawn(test2.count) - - print "Joining class1/thread1" - thread1.join() - print "Joining class1/thread2" - thread2.join() - print "Joining class2/thread3" - thread3.join() - - print "Creating class1/thread4 (its finished, allowed again)" - thread4 = gevent.spawn(test.count) - print "Joining thread4" - thread4.join() - - print thread1.value, thread2.value, thread3.value, thread4.value - print "Done." - - -def testNoblocking(): - test = TestNoblock() - test2 = TestNoblock() - print "Creating class1/thread1" - thread1 = test.count() - print "Creating class1/thread2 (ignored)" - thread2 = test.count() - print "Creating class2/thread3" - thread3 = test2.count() - print "Joining class1/thread1" - thread1.join() - print "Joining class1/thread2" - thread2.join() - print "Joining class2/thread3" - thread3.join() - - print "Creating class1/thread4 (its finished, allowed again)" - thread4 = test.count() - print "Joining thread4" - thread4.join() - - print thread1.value, thread2.value, thread3.value, thread4.value - print "Done." - - -def testBenchmark(): - import time - - def printThreadNum(): - import gc - from greenlet import greenlet - objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] - print "Greenlets: %s" % len(objs) - - printThreadNum() - test = TestNoblock() - s = time.time() - for i in range(3): - gevent.spawn(test.count, i + 1) - print "Created in %.3fs" % (time.time() - s) - printThreadNum() - time.sleep(5) - - if __name__ == "__main__": + class Test(): + + @Noparallel() + def count(self, num=5): + for i in range(num): + print self, i + time.sleep(1) + return "%s return:%s" % (self, i) + + class TestNoblock(): + + @Noparallel(blocking=False) + def count(self, num=5): + for i in range(num): + print self, i + time.sleep(1) + return "%s return:%s" % (self, i) + + def testBlocking(): + test = Test() + test2 = Test() + print "Counting..." + print "Creating class1/thread1" + thread1 = gevent.spawn(test.count) + print "Creating class1/thread2 (ignored)" + thread2 = gevent.spawn(test.count) + print "Creating class2/thread3" + thread3 = gevent.spawn(test2.count) + + print "Joining class1/thread1" + thread1.join() + print "Joining class1/thread2" + thread2.join() + print "Joining class2/thread3" + thread3.join() + + print "Creating class1/thread4 (its finished, allowed again)" + thread4 = gevent.spawn(test.count) + print "Joining thread4" + thread4.join() + + print thread1.value, thread2.value, thread3.value, thread4.value + print "Done." + + def testNoblocking(): + test = TestNoblock() + test2 = TestNoblock() + print "Creating class1/thread1" + thread1 = test.count() + print "Creating class1/thread2 (ignored)" + thread2 = test.count() + print "Creating class2/thread3" + thread3 = test2.count() + print "Joining class1/thread1" + thread1.join() + print "Joining class1/thread2" + thread2.join() + print "Joining class2/thread3" + thread3.join() + + print "Creating class1/thread4 (its finished, allowed again)" + thread4 = test.count() + print "Joining thread4" + thread4.join() + + print thread1.value, thread2.value, thread3.value, thread4.value + print "Done." + + def testBenchmark(): + import time + + def printThreadNum(): + import gc + from greenlet import greenlet + objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] + print "Greenlets: %s" % len(objs) + + printThreadNum() + test = TestNoblock() + s = time.time() + for i in range(3): + gevent.spawn(test.count, i + 1) + print "Created in %.3fs" % (time.time() - s) + printThreadNum() + time.sleep(5) from gevent import monkey monkey.patch_all() diff --git a/src/util/RateLimit.py b/src/util/RateLimit.py index f87f58af..7beb94c5 100644 --- a/src/util/RateLimit.py +++ b/src/util/RateLimit.py @@ -11,8 +11,8 @@ queue_db = {} # Commands queued to run # Return: None -def called(event): - called_db[event] = time.time() +def called(event, penalty=0): + called_db[event] = time.time() + penalty # Check if calling event is allowed @@ -62,15 +62,15 @@ def callAsync(event, allowed_again=10, func=None, *args, **kwargs): def call(event, allowed_again=10, func=None, *args, **kwargs): if isAllowed(event): # Not called recently, call it now called(event) - # print "Calling now" + # print "Calling now", allowed_again return func(*args, **kwargs) else: # Called recently, schedule it for later time_left = max(0, allowed_again - (time.time() - called_db[event])) # print "Time left: %s" % time_left, args, kwargs log.debug("Calling sync (%.2fs left): %s" % (time_left, event)) + called(event, time_left) time.sleep(time_left) - called(event) back = func(*args, **kwargs) if event in called_db: del called_db[event]