producer_test.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:data_pipeline 作者: Yelp 项目源码 文件源码
def test_sensu_process_called_once_inside_window(
        self,
        create_message,
        registered_schema,
        producer,
        message_count
    ):
        with mock.patch.object(
            data_pipeline.tools.sensu_ttl_alerter.SensuTTLAlerter,
            'process',
            autospec=True,
            return_value=None
        ) as mock_sensu_ttl_process:
            producer.enable_sensu = True
            m1 = create_message(registered_schema, timeslot=1.0)
            for i in range(message_count):
                producer.publish(m1)
            assert mock_sensu_ttl_process.call_count == 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号