def test_register_signal_handler(
self,
patch_config,
patch_db_connections,
patch_restarter,
patch_signal,
patch_running,
patch_producer,
patch_exit,
):
patch_running.return_value = False
replication_stream = self._init_and_run_batch()
# ZKLock also calls patch_signal, so we have to work around it
assert [
mock.call(signal.SIGINT, replication_stream._handle_shutdown_signal),
mock.call(signal.SIGTERM, replication_stream._handle_shutdown_signal),
mock.call(signal.SIGUSR2, replication_stream._handle_profiler_signal),
] in patch_signal.call_args_list
parse_replication_stream_internal_test.py 文件源码
python
阅读 36
收藏 0
点赞 0
评论 0
评论列表
文章目录