def test_get_nowait_unlock(self):
hub = hubs.get_hub()
result = []
q = eventlet.Queue(0)
p = eventlet.spawn(q.put, 5)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.get_nowait)
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
assert result == [5], result
# TODO add ready to greenthread
# assert p.ready(), p
assert p.dead, p
assert q.empty(), q
# put_nowait must work from the mainloop
评论列表
文章目录