gateway.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:Brightside 作者: BrighterCommand 项目源码 文件源码
def send(self, message: BrightsideMessage):
        # we want to expose our logger to the functions defined in inner scope, so put it in their outer scope

        logger = self._logger

        def _build_message_header(msg: BrightsideMessage) -> Dict:
            return KombuMessageFactory(msg).create_message_header()

        def _publish(sender: Producer) -> None:
            logger.debug("Send message {body} to broker {amqpuri} with routing key {routing_key}"
                         .format(body=message, amqpuri=self._amqp_uri, routing_key=message.header.topic))
            sender.publish(message.body.bytes,
                           headers=_build_message_header(message),
                           exchange=self._exchange,
                           content_type="text/plain",
                           routing_key=message.header.topic,
                           declare=[self._exchange])

        def _error_callback(e, interval) -> None:
            logger.debug('Publishing error: {e}. Will retry in {interval} seconds', e, interval)

        self._logger.debug("Connect to broker {amqpuri}".format(amqpuri=self._amqp_uri))

        with connections[self._cnx].acquire(block=True) as conn:
            with Producer(conn) as producer:
                ensure_kwargs = self.RETRY_OPTIONS.copy()
                ensure_kwargs['errback'] = _error_callback
                safe_publish = conn.ensure(producer, _publish, **ensure_kwargs)
                safe_publish(producer)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号