tailer.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:data_pipeline 作者: Yelp 项目源码 文件源码
def run(self):
        logger.info(
            "Starting to consume from {}".format(self.topic_to_offsets_map)
        )

        with Consumer(
            # The tailer name should be unique - if it's not, partitions will
            # be split between multiple tailer instances
            'data_pipeline_tailer-{}'.format(
                str(UUID(bytes=FastUUID().uuid4()).hex)
            ),
            'bam',
            ExpectedFrequency.constantly,
            self.topic_to_offsets_map,
            auto_offset_reset=self.options.offset_reset_location,
            cluster_name=self.options.cluster_name
        ) as consumer:
            message_count = 0
            while self.keep_running(message_count):
                message = consumer.get_message(blocking=True, timeout=0.1)
                if message is not None:
                    if self.options.end_timestamp is None or message.timestamp < self.options.end_timestamp:
                        print self._format_message(message)
                        message_count += 1
                    else:
                        self._running = False
                        logger.info(
                            "Latest message surpasses --end-timestamp. Stopping tailer..."
                        )
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号