def test_call_with_io_loop(self):
class Object(object):
def __init__(self, io_loop):
self._io_loop = io_loop
self.executor = futures.thread.ThreadPoolExecutor(1)
@run_on_executor(io_loop='_io_loop')
def f(self):
return 42
o = Object(io_loop=self.io_loop)
answer = yield o.f()
self.assertEqual(answer, 42)
concurrent_test.py 文件源码
python
阅读 15
收藏 0
点赞 0
评论 0
评论列表
文章目录