Rev4104, Don't start blocking Noparallel calls in separate greenlet to be able to catch exceptions.

This commit is contained in:
shortcutme 2019-06-11 17:04:37 +02:00
parent eeef6fe65f
commit 862e19a263
No known key found for this signature in database
GPG key ID: 5B63BAE6CB9613AE
3 changed files with 62 additions and 10 deletions

View file

@ -13,7 +13,7 @@ class Config(object):
def __init__(self, argv):
self.version = "0.7.0"
self.rev = 4102
self.rev = 4104
self.argv = argv
self.action = None
self.pending_changes = {}

View file

@ -1,8 +1,9 @@
import time
import util
import gevent
import pytest
import util
class ExampleClass(object):
def __init__(self):
@ -120,3 +121,12 @@ class TestNoparallel:
taken = time.time() - s
assert 1.2 > taken >= 1.0 # 2 * 0.5s count = ~1s
def testException(self):
@util.Noparallel()
def raiseException():
raise Exception("Test error!")
with pytest.raises(Exception) as err:
raiseException()
assert str(err) == "Test error!"

View file

@ -1,5 +1,6 @@
import gevent
import time
from gevent.event import AsyncResult
class Noparallel(object): # Only allow function running once in same time
@ -25,12 +26,14 @@ class Noparallel(object): # Only allow function running once in same time
self.queued = True
thread = self.threads[key]
if self.blocking:
thread.join() # Blocking until its finished
if self.queued:
res = thread.get() # Blocking until its finished
if key in self.threads:
return self.threads[key].get() # Queue finished since started running
self.queued = False
return wrapper(*args, **kwargs) # Run again after the end
else:
return thread.value # Return the value
return thread.get() # Return the value
else: # No blocking
if thread.ready(): # Its finished, create a new
@ -40,14 +43,22 @@ class Noparallel(object): # Only allow function running once in same time
else: # Still running
return thread
else: # Thread not running
thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread
thread.link(lambda thread: self.cleanup(key, thread))
self.threads[key] = thread
if self.blocking: # Wait for finish
thread.join()
ret = thread.value
return ret
asyncres = AsyncResult()
self.threads[key] = asyncres
try:
res = func(*args, **kwargs)
asyncres.set(res)
self.cleanup(key, asyncres)
return res
except Exception as err:
asyncres.set_exception(err)
self.cleanup(key, asyncres)
raise(err)
else: # No blocking just return the thread
thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread
thread.link(lambda thread: self.cleanup(key, thread))
self.threads[key] = thread
return thread
wrapper.__name__ = func.__name__
@ -60,6 +71,8 @@ class Noparallel(object): # Only allow function running once in same time
if __name__ == "__main__":
class Test():
@Noparallel()
@ -145,11 +158,40 @@ if __name__ == "__main__":
print("Created in %.3fs" % (time.time() - s))
printThreadNum()
time.sleep(5)
def testException():
import time
@Noparallel(blocking=True, queue=True)
def count(self, num=5):
s = time.time()
# raise Exception("err")
for i in range(num):
print(self, i)
time.sleep(1)
return "%s return:%s" % (s, i)
def caller():
try:
print("Ret:", count(5))
except Exception as err:
print("Raised:", repr(err))
gevent.joinall([
gevent.spawn(caller),
gevent.spawn(caller),
gevent.spawn(caller),
gevent.spawn(caller)
])
from gevent import monkey
monkey.patch_all()
testException()
"""
testBenchmark()
print("Testing blocking mode...")
testBlocking()
print("Testing noblocking mode...")
testNoblocking()
"""