def amqp_consume(self):
"""Connect to Hub Server and set up and start AMQP consumer"""
# define callback queue
self.queue = self.chan.queue_declare(exclusive=True).queue
self.chan.queue_bind(self.queue, self.exchange, self.queue)
self.chan.basic_consume(self.queue, callback=self.amqp_handle_msg,
no_ack=True)
log.debug("%s: Initialized amqp connection, channel, queue.", self.lbl)
# send rpc request
self.worker_id = None
self.correlation_id = uuid.uuid4().hex
reply_to = self.queue
routing_key = '%s.worker.%s' % (self.key, self.worker_type)
msg = amqp.Message(json.dumps(self.worker_kwargs),
correlation_id=self.correlation_id,
reply_to=reply_to,
content_type='application/json')
self.amqp_send_msg(msg, routing_key)
log.info("%s: sent RPC request, will wait for response.", self.lbl)
# wait for rpc response
try:
while not self.worker_id:
log.debug("%s: Waiting for RPC response.", self.lbl)
self.chan.wait()
except BaseException as exc:
log.error("%s: Amqp consumer received %r while waiting for RPC "
"response. Stopping.", self.lbl, exc)
log.info("%s: Finished waiting for RPC response.", self.lbl)
super(HubClient, self).amqp_consume()
评论列表
文章目录