def mock_consumer(self, KafkaConsumer, value, max_calls=1):
# Mock a consumer object
fake_kafka_consumer = MagicMock()
# Should return a record when used as an iterator. Set up the mock to
# return the record up to the limit of max_calls. Then raises StopIteration
record = ConsumerRecord(
topic=TOPIC_STATES,
partition=0,
offset=42,
timestamp=1467649216540,
timestamp_type=0,
key=b'NY',
value=value,
checksum=binascii.crc32(value),
serialized_key_size=b'NY',
serialized_value_size=value)
meta = { 'i': 0 }
def _iter(*args, **kwargs):
if meta['i'] >= max_calls:
raise StopIteration()
meta['i'] += 1
return record
fake_kafka_consumer.__next__.side_effect = _iter
# Return some partitions
fake_kafka_consumer.partitions_for_topic.return_value = set([0, 1])
# Make class instantiation return our mock
KafkaConsumer.return_value = fake_kafka_consumer
return fake_kafka_consumer
评论列表
文章目录