def publish(self, message):
self._message_number_out += 1
amqp_message_update_meta(message, self.get_meta())
amqp_msg = amqp_message_encode(message)
log.debug("Publish message #%s, AMQP message: %s" % (self._message_number_out, amqp_msg))
properties = BasicProperties(
app_id=self.app_id,
content_type='application/json',
content_encoding='utf-8',
delivery_mode=2, # persistent
)
try:
yield self._channel.basic_publish(
self.exchange_name,
self.queue_out_routing_key,
amqp_msg,
properties=properties,
)
except ChannelClosed:
self.retry_channel()
self._cached_messages.append(message)
except AMQPError:
self.retry_connect()
self._cached_messages.append(message)
评论列表
文章目录