def test_semaphore_contention():
g_mutex = eventlet.Semaphore()
counts = [0, 0]
def worker(no):
while min(counts) < 200:
with g_mutex:
counts[no - 1] += 1
eventlet.sleep(0.001)
t1 = eventlet.spawn(worker, no=1)
t2 = eventlet.spawn(worker, no=2)
eventlet.sleep(0.5)
t1.kill()
t2.kill()
assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
评论列表
文章目录