def message_worker(self):
while 1:
try:
record, routing_key = self.queue.get()
if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed:
self.open_connection()
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=self.format(record),
properties=pika.BasicProperties(
delivery_mode=2
)
)
except Exception:
self.channel, self.connection = None, None
self.handleError(record)
finally:
self.queue.task_done()
if self.close_after_emit:
self.close_connection()
handlers_oneway.py 文件源码
python
阅读 21
收藏 0
点赞 0
评论 0
评论列表
文章目录