producer_test.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:data_pipeline 作者: Yelp 项目源码 文件源码
def test_meteorite_on_off(
        self,
        create_message,
        registered_schema,
        producer,
        enable_meteorite,
        expected_call_count
    ):
        with mock.patch.object(
            data_pipeline.tools.meteorite_wrappers.StatsCounter,
            'process',
            autospec=True
        ) as mock_stats_counter:
            producer.enable_meteorite = enable_meteorite
            m = create_message(registered_schema, timeslot=1.0)
            producer.publish(m)
            assert mock_stats_counter.call_count == expected_call_count
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号