Python和RabbitMQ-侦听来自多个渠道的消费事件的最佳方法?

发布于 2021-01-29 15:18:37

我有两个单独的RabbitMQ实例。我正在尝试找到聆听这两个事件的最佳方法。

例如,我可以通过以下方法使用一个事件:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

我还有第二个主机“
host2”,我也想听听。我曾考虑过创建两个单独的线程来执行此操作,但据我所读,pika并不是线程安全的。有没有更好的办法?还是创建两个单独的线程,每个线程侦听不同的Rabbit实例(host1和host2)是否足够?

关注者
0
被浏览
48
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    “什么是最好的方法”的答案在很大程度上取决于队列的使用模式以及“最好”的含义。由于我无法对问题发表评论,因此我将尝试提出一些可能的解决方案。

    在每个示例中,我将假定已声明交换。

    线程数

    您可以在单个进程中使用来自不同主机上两个队列的消息pika

    您是对的-
    正如它自己的FAQ所述pika它不是线程安全的,但是可以通过为每个线程创建与RabbitMQ主机的连接来以多线程方式使用它。使此示例使用threading模块在线程中运行如下所示:

    import pika
    import threading
    
    
    class ConsumerThread(threading.Thread):
        def __init__(self, host, *args, **kwargs):
            super(ConsumerThread, self).__init__(*args, **kwargs)
    
            self._host = host
    
        # Not necessarily a method.
        def callback_func(self, channel, method, properties, body):
            print("{} received '{}'".format(self.name, body))
    
        def run(self):
            credentials = pika.PlainCredentials("guest", "guest")
    
            connection = pika.BlockingConnection(
                pika.ConnectionParameters(host=self._host,
                                          credentials=credentials))
    
            channel = connection.channel()
    
            result = channel.queue_declare(exclusive=True)
    
            channel.queue_bind(result.method.queue,
                               exchange="my-exchange",
                               routing_key="*.*.*.*.*")
    
            channel.basic_consume(self.callback_func,
                                  result.method.queue,
                                  no_ack=True)
    
            channel.start_consuming()
    
    
    if __name__ == "__main__":
        threads = [ConsumerThread("host1"), ConsumerThread("host2")]
        for thread in threads:
            thread.start()
    

    我已经声明callback_func为纯粹ConsumerThread.name在打印邮件正文时使用的方法。它也可能是ConsumerThread类之外的函数。

    工艺流程

    另外,您始终可以只对要使用事件的每个队列使用用户代码运行一个进程。

    import pika
    import sys
    
    
    def callback_func(channel, method, properties, body):
        print(body)
    
    
    if __name__ == "__main__":
        credentials = pika.PlainCredentials("guest", "guest")
    
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=sys.argv[1],
                                      credentials=credentials))
    
        channel = connection.channel()
    
        result = channel.queue_declare(exclusive=True)
    
        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")
    
        channel.basic_consume(callback_func, result.method.queue, no_ack=True)
    
        channel.start_consuming()
    

    然后运行:

    $ python single_consume.py host1
    $ python single_consume.py host2  # e.g. on another console
    

    如果您要处理来自队列的消息的工作量很大,并且只要CPU中的核心数>
    =使用者数,通常最好使用这种方法-除非您的队列在大多数情况下是空的,并且消费者不会利用此CPU时间*。

    异步

    另一个选择是涉及一些异步框架(例如Twisted)并在单个线程中运行整个程序。

    您不能再BlockingConnection在异步代码中使用;幸运的是,pika有适配器Twisted

    from pika.adapters.twisted_connection import TwistedProtocolConnection
    from pika.connection import ConnectionParameters
    from twisted.internet import protocol, reactor, task
    from twisted.python import log
    
    
    class Consumer(object):
        def on_connected(self, connection):
            d = connection.channel()
            d.addCallback(self.got_channel)
            d.addCallback(self.queue_declared)
            d.addCallback(self.queue_bound)
            d.addCallback(self.handle_deliveries)
            d.addErrback(log.err)
    
        def got_channel(self, channel):
            self.channel = channel
    
            return self.channel.queue_declare(exclusive=True)
    
        def queue_declared(self, queue):
            self._queue_name = queue.method.queue
    
            self.channel.queue_bind(queue=self._queue_name,
                                    exchange="my-exchange",
                                    routing_key="*.*.*.*.*")
    
        def queue_bound(self, ignored):
            return self.channel.basic_consume(queue=self._queue_name)
    
        def handle_deliveries(self, queue_and_consumer_tag):
            queue, consumer_tag = queue_and_consumer_tag
            self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
    
            return self.looping_call.start(0)
    
        def consume_from_queue(self, queue):
            d = queue.get()
    
            return d.addCallback(lambda result: self.handle_payload(*result))
    
        def handle_payload(self, channel, method, properties, body):
            print(body)
    
    
    if __name__ == "__main__":
        consumer1 = Consumer()
        consumer2 = Consumer()
    
        parameters = ConnectionParameters()
        cc = protocol.ClientCreator(reactor,
                                    TwistedProtocolConnection,
                                    parameters)
        d1 = cc.connectTCP("host1", 5672)
        d1.addCallback(lambda protocol: protocol.ready)
        d1.addCallback(consumer1.on_connected)
        d1.addErrback(log.err)
    
        d2 = cc.connectTCP("host2", 5672)
        d2.addCallback(lambda protocol: protocol.ready)
        d2.addCallback(consumer2.on_connected)
        d2.addErrback(log.err)
    
        reactor.run()
    

    如果您从中使用更多的队列,并且消费者执行的工作占用的CPU越少,该方法将更好。

    Python 3

    既然您已经提到pika过,由于pika尚未移植,我将自己局限于基于Python 2.x的解决方案。

    但是,如果您想转到> =
    3.3,则一种可能的选择是asyncio与AMQP协议(您在RabbitMQ中使用的协议)之一配合使用,例如asynqpaioamqp

    *-请注意,这些技巧很浅-在大多数情况下,选择并不那么明显;对您而言最合适的取决于队列“饱和”(消息/时间),在收到这些消息后您将做什么工作,在什么环境下运行消费者等?除了对所有实现进行基准测试之外,没有其他方法可以确保



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看