parse_replication_stream_internal_test.py 文件源码

python
阅读 36 收藏 0 点赞 0 评论 0

项目:mysql_streamer 作者: Yelp 项目源码 文件源码
def test_register_signal_handler(
        self,
        patch_config,
        patch_db_connections,
        patch_restarter,
        patch_signal,
        patch_running,
        patch_producer,
        patch_exit,
    ):
        patch_running.return_value = False
        replication_stream = self._init_and_run_batch()
        # ZKLock also calls patch_signal, so we have to work around it
        assert [
            mock.call(signal.SIGINT, replication_stream._handle_shutdown_signal),
            mock.call(signal.SIGTERM, replication_stream._handle_shutdown_signal),
            mock.call(signal.SIGUSR2, replication_stream._handle_profiler_signal),
        ] in patch_signal.call_args_list
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号