BaseConsumer.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:kafkatos3 作者: ConnectedHomes 项目源码 文件源码
def run(self):
        '''Main class entrypoint'''
        nice_level = self.config.get("consumer", "consumer_nice_level")
        process = psutil.Process(os.getpid())
        process.nice(int(nice_level))

        setproctitle("[consumer" + self.consumer_id + "] " + getproctitle())

        while not self.shutting_down:
            try:
                self.run_consumer()

            except Exception as exe:
                self.logger.error(
                    "Unexpected error with kafka consumer: " + str(exe))
                self.logger.error(traceback.format_exc())
                self.logger.error(
                    "Sleeping for 30 seconds before trying again")
                if self.consumer != None:
                    self.consumer.commit()
                    self.consumer.close()

                for part in self.partitions:
                    self.partitions[part].writer.close()

                time.sleep(30)
        # save all our offsets
        self.consumer.commit()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号