def run(self):
'''Main class entrypoint'''
nice_level = self.config.get("consumer", "consumer_nice_level")
process = psutil.Process(os.getpid())
process.nice(int(nice_level))
setproctitle("[consumer" + self.consumer_id + "] " + getproctitle())
while not self.shutting_down:
try:
self.run_consumer()
except Exception as exe:
self.logger.error(
"Unexpected error with kafka consumer: " + str(exe))
self.logger.error(traceback.format_exc())
self.logger.error(
"Sleeping for 30 seconds before trying again")
if self.consumer != None:
self.consumer.commit()
self.consumer.close()
for part in self.partitions:
self.partitions[part].writer.close()
time.sleep(30)
# save all our offsets
self.consumer.commit()
评论列表
文章目录