Test main loop caller
This commit is contained in:
parent
f01d335835
commit
61f1a741fc
1 changed files with 101 additions and 11 deletions
|
@ -1,6 +1,8 @@
|
||||||
import time
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
import gevent
|
import gevent
|
||||||
|
import pytest
|
||||||
|
|
||||||
from util import ThreadPool
|
from util import ThreadPool
|
||||||
|
|
||||||
|
@ -23,19 +25,18 @@ class TestThreadPool:
|
||||||
return out
|
return out
|
||||||
|
|
||||||
threads = []
|
threads = []
|
||||||
for i in range(2):
|
for i in range(3):
|
||||||
threads.append(gevent.spawn(blocker))
|
threads.append(gevent.spawn(blocker))
|
||||||
gevent.joinall(threads)
|
gevent.joinall(threads)
|
||||||
|
|
||||||
assert events == ["S"] * 2 + ["M"] * 2 + ["D"] * 2
|
assert events == ["S"] * 3 + ["M"] * 3 + ["D"] * 3
|
||||||
|
|
||||||
res = blocker()
|
res = blocker()
|
||||||
assert res == 10000000
|
assert res == 10000000
|
||||||
|
pool.kill()
|
||||||
|
|
||||||
def testLockBlockingSameThread(self):
|
def testLockBlockingSameThread(self):
|
||||||
from gevent.lock import Semaphore
|
lock = ThreadPool.Lock()
|
||||||
|
|
||||||
lock = Semaphore()
|
|
||||||
|
|
||||||
s = time.time()
|
s = time.time()
|
||||||
|
|
||||||
|
@ -54,24 +55,113 @@ class TestThreadPool:
|
||||||
def testLockBlockingDifferentThread(self):
|
def testLockBlockingDifferentThread(self):
|
||||||
lock = ThreadPool.Lock()
|
lock = ThreadPool.Lock()
|
||||||
|
|
||||||
s = time.time()
|
|
||||||
|
|
||||||
def locker():
|
def locker():
|
||||||
lock.acquire(True)
|
lock.acquire(True)
|
||||||
time.sleep(1)
|
time.sleep(0.5)
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
pool = gevent.threadpool.ThreadPool(10)
|
pool = ThreadPool.ThreadPool(10)
|
||||||
pool.spawn(locker)
|
|
||||||
threads = [
|
threads = [
|
||||||
pool.spawn(locker),
|
pool.spawn(locker),
|
||||||
|
pool.spawn(locker),
|
||||||
|
gevent.spawn(locker),
|
||||||
|
pool.spawn(locker)
|
||||||
]
|
]
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
s = time.time()
|
||||||
|
|
||||||
lock.acquire(True, 5.0)
|
lock.acquire(True, 5.0)
|
||||||
|
|
||||||
unlock_taken = time.time() - s
|
unlock_taken = time.time() - s
|
||||||
|
|
||||||
assert 2.0 < unlock_taken < 2.5
|
assert 1.8 < unlock_taken < 2.2
|
||||||
|
|
||||||
gevent.joinall(threads)
|
gevent.joinall(threads)
|
||||||
|
|
||||||
|
def testMainLoopCallerThreadId(self):
|
||||||
|
main_thread_id = threading.current_thread().ident
|
||||||
|
pool = ThreadPool.ThreadPool(5)
|
||||||
|
|
||||||
|
def getThreadId(*args, **kwargs):
|
||||||
|
return threading.current_thread().ident
|
||||||
|
|
||||||
|
t = pool.spawn(getThreadId)
|
||||||
|
assert t.get() != main_thread_id
|
||||||
|
|
||||||
|
t = pool.spawn(lambda: ThreadPool.main_loop.call(getThreadId))
|
||||||
|
assert t.get() == main_thread_id
|
||||||
|
|
||||||
|
def testMainLoopCallerGeventSpawn(self):
|
||||||
|
main_thread_id = threading.current_thread().ident
|
||||||
|
pool = ThreadPool.ThreadPool(5)
|
||||||
|
def waiter():
|
||||||
|
time.sleep(1)
|
||||||
|
return threading.current_thread().ident
|
||||||
|
|
||||||
|
def geventSpawner():
|
||||||
|
event = ThreadPool.main_loop.call(gevent.spawn, waiter)
|
||||||
|
|
||||||
|
with pytest.raises(Exception) as greenlet_err:
|
||||||
|
event.get()
|
||||||
|
assert str(greenlet_err.value) == "cannot switch to a different thread"
|
||||||
|
|
||||||
|
waiter_thread_id = ThreadPool.main_loop.call(event.get)
|
||||||
|
return waiter_thread_id
|
||||||
|
|
||||||
|
s = time.time()
|
||||||
|
waiter_thread_id = pool.apply(geventSpawner)
|
||||||
|
assert main_thread_id == waiter_thread_id
|
||||||
|
time_taken = time.time() - s
|
||||||
|
assert 0.9 < time_taken < 1.2
|
||||||
|
|
||||||
|
def testEvent(self):
|
||||||
|
pool = ThreadPool.ThreadPool(5)
|
||||||
|
event = ThreadPool.Event()
|
||||||
|
|
||||||
|
def setter():
|
||||||
|
time.sleep(1)
|
||||||
|
event.set("done!")
|
||||||
|
|
||||||
|
def getter():
|
||||||
|
return event.get()
|
||||||
|
|
||||||
|
pool.spawn(setter)
|
||||||
|
t_gevent = gevent.spawn(getter)
|
||||||
|
t_pool = pool.spawn(getter)
|
||||||
|
s = time.time()
|
||||||
|
assert event.get() == "done!"
|
||||||
|
time_taken = time.time() - s
|
||||||
|
gevent.joinall([t_gevent, t_pool])
|
||||||
|
|
||||||
|
assert t_gevent.get() == "done!"
|
||||||
|
assert t_pool.get() == "done!"
|
||||||
|
|
||||||
|
assert 0.9 < time_taken < 1.2
|
||||||
|
|
||||||
|
with pytest.raises(Exception) as err:
|
||||||
|
event.set("another result")
|
||||||
|
|
||||||
|
assert "Event already has value" in str(err.value)
|
||||||
|
|
||||||
|
def testMemoryLeak(self):
|
||||||
|
import gc
|
||||||
|
thread_objs_before = [id(obj) for obj in gc.get_objects() if "threadpool" in str(type(obj))]
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
time.sleep(0.1)
|
||||||
|
return "ok"
|
||||||
|
|
||||||
|
def poolTest():
|
||||||
|
pool = ThreadPool.ThreadPool(5)
|
||||||
|
for i in range(20):
|
||||||
|
pool.spawn(worker)
|
||||||
|
pool.kill()
|
||||||
|
|
||||||
|
for i in range(5):
|
||||||
|
poolTest()
|
||||||
|
new_thread_objs = [obj for obj in gc.get_objects() if "threadpool" in str(type(obj)) and id(obj) not in thread_objs_before]
|
||||||
|
#print("New objs:", new_thread_objs, "run:", num_run)
|
||||||
|
|
||||||
|
# Make sure no threadpool object left behind
|
||||||
|
assert not new_thread_objs
|
||||||
|
|
Loading…
Reference in a new issue