def testCallInThread(self):
waiter = threading.Event()
result = []
def threadedFunc():
result.append(threadable.isInIOThread())
waiter.set()
reactor.callInThread(threadedFunc)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event.")
else:
self.assertEquals(result, [False])
评论列表
文章目录