diff --git a/src/Config.py b/src/Config.py index bcd9d3c7..36b5c2f8 100644 --- a/src/Config.py +++ b/src/Config.py @@ -13,7 +13,7 @@ class Config(object): def __init__(self, argv): self.version = "0.7.0" - self.rev = 4102 + self.rev = 4104 self.argv = argv self.action = None self.pending_changes = {} diff --git a/src/Test/TestNoparallel.py b/src/Test/TestNoparallel.py index 6dfc6b53..79b62f63 100644 --- a/src/Test/TestNoparallel.py +++ b/src/Test/TestNoparallel.py @@ -1,8 +1,9 @@ import time -import util import gevent +import pytest +import util class ExampleClass(object): def __init__(self): @@ -120,3 +121,12 @@ class TestNoparallel: taken = time.time() - s assert 1.2 > taken >= 1.0 # 2 * 0.5s count = ~1s + + def testException(self): + @util.Noparallel() + def raiseException(): + raise Exception("Test error!") + + with pytest.raises(Exception) as err: + raiseException() + assert str(err) == "Test error!" diff --git a/src/util/Noparallel.py b/src/util/Noparallel.py index 2a5f974a..541c8699 100644 --- a/src/util/Noparallel.py +++ b/src/util/Noparallel.py @@ -1,5 +1,6 @@ import gevent import time +from gevent.event import AsyncResult class Noparallel(object): # Only allow function running once in same time @@ -25,12 +26,14 @@ class Noparallel(object): # Only allow function running once in same time self.queued = True thread = self.threads[key] if self.blocking: - thread.join() # Blocking until its finished if self.queued: + res = thread.get() # Blocking until its finished + if key in self.threads: + return self.threads[key].get() # Queue finished since started running self.queued = False return wrapper(*args, **kwargs) # Run again after the end else: - return thread.value # Return the value + return thread.get() # Return the value else: # No blocking if thread.ready(): # Its finished, create a new @@ -40,14 +43,22 @@ class Noparallel(object): # Only allow function running once in same time else: # Still running return thread else: # Thread not running - thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread - thread.link(lambda thread: self.cleanup(key, thread)) - self.threads[key] = thread if self.blocking: # Wait for finish - thread.join() - ret = thread.value - return ret + asyncres = AsyncResult() + self.threads[key] = asyncres + try: + res = func(*args, **kwargs) + asyncres.set(res) + self.cleanup(key, asyncres) + return res + except Exception as err: + asyncres.set_exception(err) + self.cleanup(key, asyncres) + raise(err) else: # No blocking just return the thread + thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread + thread.link(lambda thread: self.cleanup(key, thread)) + self.threads[key] = thread return thread wrapper.__name__ = func.__name__ @@ -60,6 +71,8 @@ class Noparallel(object): # Only allow function running once in same time if __name__ == "__main__": + + class Test(): @Noparallel() @@ -145,11 +158,40 @@ if __name__ == "__main__": print("Created in %.3fs" % (time.time() - s)) printThreadNum() time.sleep(5) + + def testException(): + import time + @Noparallel(blocking=True, queue=True) + def count(self, num=5): + s = time.time() + # raise Exception("err") + for i in range(num): + print(self, i) + time.sleep(1) + return "%s return:%s" % (s, i) + def caller(): + try: + print("Ret:", count(5)) + except Exception as err: + print("Raised:", repr(err)) + + gevent.joinall([ + gevent.spawn(caller), + gevent.spawn(caller), + gevent.spawn(caller), + gevent.spawn(caller) + ]) + + from gevent import monkey monkey.patch_all() + testException() + + """ testBenchmark() print("Testing blocking mode...") testBlocking() print("Testing noblocking mode...") testNoblocking() + """