test_consumer.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:django-logpipe 作者: thelabnyc 项目源码 文件源码
def mock_consumer(self, KafkaConsumer, value, max_calls=1):
        # Mock a consumer object
        fake_kafka_consumer = MagicMock()

        # Should return a record when used as an iterator. Set up the mock to
        # return the record up to the limit of max_calls. Then raises StopIteration
        record = ConsumerRecord(
            topic=TOPIC_STATES,
            partition=0,
            offset=42,
            timestamp=1467649216540,
            timestamp_type=0,
            key=b'NY',
            value=value,
            checksum=binascii.crc32(value),
            serialized_key_size=b'NY',
            serialized_value_size=value)

        meta = { 'i': 0 }

        def _iter(*args, **kwargs):
            if meta['i'] >= max_calls:
                raise StopIteration()
            meta['i'] += 1
            return record

        fake_kafka_consumer.__next__.side_effect = _iter

        # Return some partitions
        fake_kafka_consumer.partitions_for_topic.return_value = set([0, 1])

        # Make class instantiation return our mock
        KafkaConsumer.return_value = fake_kafka_consumer

        return fake_kafka_consumer
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号