def test_call_later_tx(framework_tx):
'''
Wait for two Futures.
'''
from twisted.internet.task import Clock
new_loop = Clock()
calls = []
with replace_loop(new_loop) as fake_loop:
def foo(*args, **kw):
calls.append((args, kw))
delay = txaio.call_later(1, foo, 5, 6, 7, foo="bar")
assert len(calls) == 0
assert hasattr(delay, 'cancel')
fake_loop.advance(2)
assert len(calls) == 1
assert calls[0][0] == (5, 6, 7)
assert calls[0][1] == dict(foo="bar")
评论列表
文章目录