def send(self, message: BrightsideMessage):
# we want to expose our logger to the functions defined in inner scope, so put it in their outer scope
logger = self._logger
def _build_message_header(msg: BrightsideMessage) -> Dict:
return KombuMessageFactory(msg).create_message_header()
def _publish(sender: Producer) -> None:
logger.debug("Send message {body} to broker {amqpuri} with routing key {routing_key}"
.format(body=message, amqpuri=self._amqp_uri, routing_key=message.header.topic))
sender.publish(message.body.bytes,
headers=_build_message_header(message),
exchange=self._exchange,
content_type="text/plain",
routing_key=message.header.topic,
declare=[self._exchange])
def _error_callback(e, interval) -> None:
logger.debug('Publishing error: {e}. Will retry in {interval} seconds', e, interval)
self._logger.debug("Connect to broker {amqpuri}".format(amqpuri=self._amqp_uri))
with connections[self._cnx].acquire(block=True) as conn:
with Producer(conn) as producer:
ensure_kwargs = self.RETRY_OPTIONS.copy()
ensure_kwargs['errback'] = _error_callback
safe_publish = conn.ensure(producer, _publish, **ensure_kwargs)
safe_publish(producer)
评论列表
文章目录