def test_put_nowait_unlock(self):
hub = hubs.get_hub()
result = []
q = eventlet.Queue(0)
eventlet.spawn(q.get)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
# TODO ready method on greenthread
# assert not p.ready(), p
eventlet.sleep(0)
assert result == [None], result
# TODO ready method
# assert p.ready(), p
assert q.full(), q
assert q.empty(), q
评论列表
文章目录