def run(self):
logger.info(
"Starting to consume from {}".format(self.topic_to_offsets_map)
)
with Consumer(
# The tailer name should be unique - if it's not, partitions will
# be split between multiple tailer instances
'data_pipeline_tailer-{}'.format(
str(UUID(bytes=FastUUID().uuid4()).hex)
),
'bam',
ExpectedFrequency.constantly,
self.topic_to_offsets_map,
auto_offset_reset=self.options.offset_reset_location,
cluster_name=self.options.cluster_name
) as consumer:
message_count = 0
while self.keep_running(message_count):
message = consumer.get_message(blocking=True, timeout=0.1)
if message is not None:
if self.options.end_timestamp is None or message.timestamp < self.options.end_timestamp:
print self._format_message(message)
message_count += 1
else:
self._running = False
logger.info(
"Latest message surpasses --end-timestamp. Stopping tailer..."
)
评论列表
文章目录