def test_transform_service_heartbeat(self, coordinator):
# mock coordinator
fake_kazoo_driver = MagicMock(name="MagicKazooDriver",
spec=KazooDriver)
coordinator.return_value = fake_kazoo_driver
# option1
serv_thread = transform_service.TransformService()
serv_thread.daemon = True
serv_thread.start()
time.sleep(2)
# option2
# mocks dont seem to work when spawning a service
# pid = _spawn_transform_service()
# time.sleep()
# os.kill(pid, signal.SIGNAL_SIGTERM)
fake_kazoo_driver.heartbeat.assert_called_with()
test_transform_service.py 文件源码
python
阅读 34
收藏 0
点赞 0
评论 0
评论列表
文章目录