def test_configures_thread_pool(self):
# Patch and restore where it's visible because patching a running
# reactor is potentially fairly harmful.
patcher = monkey.MonkeyPatcher()
patcher.add_patch(reactor, "threadpool", None)
patcher.add_patch(reactor, "threadpoolForDatabase", None)
patcher.patch()
try:
service_maker = RegionServiceMaker("Harry", "Hill")
# Disable _ensureConnection() its not allowed in the reactor.
self.patch_autospec(service_maker, "_ensureConnection")
service_maker.makeService(Options())
threadpool = reactor.getThreadPool()
self.assertThat(threadpool, IsInstance(ThreadPool))
finally:
patcher.restore()
评论列表
文章目录