Multi-process and gevent loop friendly lock
This commit is contained in:
parent
b7c6b84826
commit
66a1c4d242
2 changed files with 67 additions and 1 deletions
|
@ -1,3 +1,5 @@
|
||||||
|
import time
|
||||||
|
|
||||||
import gevent
|
import gevent
|
||||||
|
|
||||||
from util import ThreadPool
|
from util import ThreadPool
|
||||||
|
@ -27,3 +29,47 @@ class TestThreadPool:
|
||||||
|
|
||||||
res = blocker()
|
res = blocker()
|
||||||
assert res == 10000000
|
assert res == 10000000
|
||||||
|
|
||||||
|
def testLockBlockingSameThread(self):
|
||||||
|
from gevent.lock import Semaphore
|
||||||
|
|
||||||
|
lock = Semaphore()
|
||||||
|
|
||||||
|
s = time.time()
|
||||||
|
|
||||||
|
def unlocker():
|
||||||
|
time.sleep(1)
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
gevent.spawn(unlocker)
|
||||||
|
lock.acquire(True)
|
||||||
|
lock.acquire(True, timeout=2)
|
||||||
|
|
||||||
|
unlock_taken = time.time() - s
|
||||||
|
|
||||||
|
assert 1.0 < unlock_taken < 1.5
|
||||||
|
|
||||||
|
def testLockBlockingDifferentThread(self):
|
||||||
|
lock = ThreadPool.Lock()
|
||||||
|
|
||||||
|
s = time.time()
|
||||||
|
|
||||||
|
def locker():
|
||||||
|
lock.acquire(True)
|
||||||
|
time.sleep(1)
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
pool = gevent.threadpool.ThreadPool(10)
|
||||||
|
pool.spawn(locker)
|
||||||
|
threads = [
|
||||||
|
pool.spawn(locker),
|
||||||
|
]
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
lock.acquire(True, 5.0)
|
||||||
|
|
||||||
|
unlock_taken = time.time() - s
|
||||||
|
|
||||||
|
assert 2.0 < unlock_taken < 2.5
|
||||||
|
|
||||||
|
gevent.joinall(threads)
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
import gevent.threadpool
|
import gevent.threadpool
|
||||||
|
import gevent._threading
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
class ThreadPool:
|
class ThreadPool:
|
||||||
|
@ -17,6 +19,24 @@ class ThreadPool:
|
||||||
return func
|
return func
|
||||||
|
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
return self.pool.apply(func, args, kwargs)
|
res = self.pool.apply(func, args, kwargs)
|
||||||
|
return res
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
main_thread_id = threading.current_thread().ident
|
||||||
|
lock_pool = gevent.threadpool.ThreadPool(10)
|
||||||
|
|
||||||
|
|
||||||
|
class Lock:
|
||||||
|
def __init__(self):
|
||||||
|
self.lock = gevent._threading.Lock()
|
||||||
|
self.locked = self.lock.locked
|
||||||
|
self.release = self.lock.release
|
||||||
|
|
||||||
|
def acquire(self, *args, **kwargs):
|
||||||
|
if self.locked() and threading.current_thread().ident == main_thread_id:
|
||||||
|
return lock_pool.apply(self.lock.acquire, args, kwargs)
|
||||||
|
else:
|
||||||
|
return self.lock.acquire(*args, **kwargs)
|
||||||
|
|
Loading…
Reference in a new issue