def consumer(self):
self.logger.info(f"{self.name}: consumer coroutine started!")
while True:
source, events, last_id = await self.queue.get()
self.logger.info(f"{self.name}: got group of events from queue")
try:
with self.sql.transaction() as trans:
if events is not None:
await self.write(trans, source, events)
await self.update(trans, source, last_id)
except SQLAlchemyError:
self.logger.error(f"{self.name}: error during event write", exc_info=1)
self.queue.task_done()
评论列表
文章目录