def run(self):
while True:
try:
self.log(logging.DEBUG, "Running the RabbitMQ worker: {}".format(os.getpid()))
with pika.BlockingConnection(pika.ConnectionParameters(**self.connection_params)) as connection:
channel = connection.channel()
channel.queue_declare(queue=self.queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
for message_object in channel.consume(queue=self.queue_name, inactivity_timeout=10):
if message_object is None:
connection.process_data_events(time_limit=5)
else:
self.callback(channel, *message_object)
except Exception as exp:
self.log(logging.ERROR, "Worker have issues while receiving: {}".format(exp))
评论列表
文章目录