def _send(self, command, payload=None):
# send rpc request
if self.correlation_id:
raise Exception("Can't send second request while already waiting.")
self.response = None
self.correlation_id = uuid.uuid4().hex
routing_key = '%s.%s' % (self.key, command)
msg = amqp.Message(json.dumps(payload),
correlation_id=self.correlation_id,
reply_to=self.queue,
content_type='application/json')
log.debug("Sending AMQP msg with routing key '%s' and body %r.",
routing_key, msg.body)
self.chan.basic_publish(msg, self.exchange, routing_key)
log.info("Sent RPC request, will wait for response.")
# wait for rpc response
try:
while self.correlation_id:
log.debug("Waiting for RPC response.")
self.chan.wait()
except BaseException as exc:
log.error("Amqp consumer received %r while waiting for RPC "
"response. Stopping.", exc)
log.info("Finished waiting for RPC response.")
response = self.response
self.response = None
return response
评论列表
文章目录