def _testBlockingCallFromThread(self, reactorFunc):
"""
Utility method to test L{threads.blockingCallFromThread}.
"""
waiter = threading.Event()
results = []
errors = []
def cb1(ign):
def threadedFunc():
try:
r = threads.blockingCallFromThread(reactor, reactorFunc)
except Exception as e:
errors.append(e)
else:
results.append(r)
waiter.set()
reactor.callInThread(threadedFunc)
return threads.deferToThread(waiter.wait, self.getTimeout())
def cb2(ign):
if not waiter.isSet():
self.fail("Timed out waiting for event")
return results, errors
return self._waitForThread().addCallback(cb1).addBoth(cb2)
评论列表
文章目录