python类Queue()的实例源码

test_weibo_harvester.py 文件源码 项目:sfm-weibo-harvester 作者: gwu-libraries 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.exchange = Exchange(EXCHANGE, type="topic")
        self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange,
                                  durable=True)
        self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web",
                                       exchange=self.exchange)
        self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange)
        weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange)
        with self._create_connection() as connection:
            self.result_queue(connection).declare()
            self.result_queue(connection).purge()
            self.web_harvest_queue(connection).declare()
            self.web_harvest_queue(connection).purge()
            self.warc_created_queue(connection).declare()
            self.warc_created_queue(connection).purge()
            # avoid raise NOT_FOUND error 404
            weibo_harvester_queue(connection).declare()
            weibo_harvester_queue(connection).purge()

        self.path = None
rabbit.py 文件源码 项目:fuel-ccp-tests 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_queue_message(self, message):
        q = kombu.Queue(message['queue'], channel=self.ch)
        msg = q.get(True)
        assert msg.body in message['id'],\
            "Message body is {}, expected {}".format(msg.body, message['id'])
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def declare_exchange(self, name, type='direct', queues=None, **options):
        """Create or update exchange

        :param name: name of exchange
        :type name: str
        :param type: type of exchange - direct, fanout, topic, match
        :type type: str
        :param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...]
        :type queues: list, None or tuple
        :param options: additional options for Exchange creation
        """
        if queues is None:
            queues = []  # pragma: no cover

        with self.connections[self.connection].acquire() as conn:
            exchange = Exchange(name, type=type, channel=conn, **options)
            exchange.declare()
            self.exchanges[name] = exchange
            for q_name, routing_key in queues:
                queue = Queue(name=q_name, channel=conn)
                queue.declare()
                queue.bind_to(exchange=name, routing_key=routing_key)
                self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name,
                                  routing_key if routing_key else q_name, name)
service.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0,
                       prefetch_count=0, **kwargs):
        """Add queue rule to Microservice

        :param prefetch_count: count of messages for getting from mq
        :param prefetch_size: size in bytes for getting data from mq
        :param handler: function for handling messages
        :param autoack: if True message.ack() after callback
        :type handler: callable object
        :param name: name of queue
        :type name: str
        """

        rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs)
        consumer = Consumer(self.connection, queues=[Queue(rule.name)],
                            callbacks=[rule.callback], auto_declare=True)
        consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size)
        self.consumers.append(consumer)
        self.logger.debug('Rule "%s" added!', rule.name)
bots.py 文件源码 项目:koslab.messengerbot 作者: koslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def consume(self):
        def worker(event, message):
            page_id = event['recipient']['id']
            bot_class, bot_args = self.get_bot(page_id)
            p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args, 
                        self.transport, self.send_exchange, 
                        self.send_queue, event, message))
            p.start()
            def stop_worker(signum, frame):
                p.terminate()
                p.join()
            signal.signal(signal.SIGTERM, stop_worker)

        exchange = Exchange(self.exchange, 'direct', durable=True)
        queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
        with Connection(self.transport) as conn:
            with conn.Consumer(queue, callbacks=[worker]) as consumer:
                while True:
                    conn.drain_events()
bots.py 文件源码 项目:koslab.messengerbot 作者: koslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sender(self):
        def worker(event, message):
            p = Process(target=spawn_send_message_worker, args=(event, message))
            p.start()
            def stop_worker(signum, frame):
                p.terminate()
                p.join()
            signal.signal(signal.SIGTERM, stop_worker)


        exchange = Exchange(self.send_exchange, 'direct', durable=True)
        queue = Queue(self.send_queue, exchange=exchange, 
                        routing_key=self.send_queue)
        with Connection(self.send_transport) as conn:
            with conn.Consumer(queue, callbacks=[worker]) as consumer:
                while True:
                    conn.drain_events()
notify.py 文件源码 项目:commissaire 作者: projectatomic 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def connect(self, exchange, channel):  # pragma: no cover
        """
        Readies the StorageNotify for publishing notification messages by
        setting up a kombu.Producer.

        :param exchange: The exchange for publishing notifications.
        :type exchange: kombu.Exchange
        :param channel: The channel to bind to.
        :type channel: kombu.transport.base.StdChannel
        """
        name = self.__class__.__name__
        self.logger.debug('Connecting {}'.format(name))

        self._queue = kombu.Queue(exchange=exchange, channel=channel)
        self._queue.declare()

        self._producer = kombu.Producer(channel, exchange)
retry_adapter.py 文件源码 项目:almanach 作者: internap 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _configure_dead_exchange(self, connection):
        def declare_dead_queue():
            channel = connection.channel()
            dead_exchange = Exchange(name=config.rabbitmq_dead_exchange(),
                                     type='direct',
                                     channel=channel)
            dead_queue = Queue(name=config.rabbitmq_dead_queue(),
                               routing_key=config.rabbitmq_routing_key(),
                               exchange=dead_exchange,
                               channel=channel)

            dead_queue.declare()

            return dead_exchange

        def error_callback(exception, interval):
            logging.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r' %
                          (interval, exception))

        declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback,
                                               interval_start=0, interval_step=5, interval_max=30)
        return declare_dead_queue()
openstack-dns-updater.py 文件源码 项目:fuel-plugin-dns-updater 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_consumers(self, consumer, channel):
        consumers = []
        exchanges = DNS_CONF["exchanges"]
        exchanges = exchanges.split(",")
        for exch in exchanges:
            exchange = Exchange(exch, type="topic", durable=False)
            queue = Queue(DNS_CONF["queue_name"], exchange,
                routing_key=DNS_CONF["routing_key"],
                durable=False, auto_delete=True, no_ack=True)
            consumers.append(consumer(queue, callbacks=[self.on_message]))
        return consumers
gateway.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, connection: Connection, configuration: BrightsideConsumerConfiguration, logger: logging.Logger=None) -> None:
        self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable)
        self._routing_key = configuration.routing_key
        self._amqp_uri = connection.amqp_uri
        self._queue_name = configuration.queue_name
        self._routing_key = configuration.routing_key
        self._prefetch_count = configuration.prefetch_count
        self._is_durable = configuration.is_durable
        self._message_factory = ArameMessageFactory()
        self._logger = logger or logging.getLogger(__name__)
        self._queue = Queue(self._queue_name, exchange=self._exchange, routing_key=self._routing_key)
        self._msg = None  # Kombu Message
        self._message = None  # Brightside Message

        # TODO: Need to fix the argument types with default types issue
carbonblack.py 文件源码 项目:pyefflux 作者: effluxsystems 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _create_queues(self, queue_list):
        for topic in queue_list:
            self.queues.append(
                Queue(
                    name=self.queue_name,
                    exchange=self.exchange,
                    routing_key=topic,
                    durable=False,
                    exclusive=True,
                    no_ack=True
                )
            )
rabbit.py 文件源码 项目:fuel-ccp-tests 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_queue(self):
        test_queue = 'test-rabbit-{}'.format(utils.rand_name())
        q = kombu.Queue(test_queue, channel=self.ch, durable=False,
                        queue_arguments={"x-expires": 15 * 60 * 1000})
        q.declare()
        return test_queue
rabbit.py 文件源码 项目:fuel-ccp-tests 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def delete_queue(self, queue):
        q = kombu.Queue(queue, channel=self.ch)
        q.delete()
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, client, name, logger=None):
        """Initialization

        :param client: instance of client
        :type client: Client
        :param name: name of queue
        :type name: str
        """
        self.client = client
        self.name = name
        if logger is None:
            logger = _logger  # pragma: no cover
        self.logger = logger
        self.logger.debug('Queue "%s" built', self.name)
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def purge_queue(self, name):
        """Remove all messages from queue

        :param name: name of queue
        :type name: str
        """
        connections = pools.Connections(self.limit)
        with connections[self.connection].acquire() as conn:
            Queue(name=name, channel=conn).purge()
            self.logger.debug('Queue "%s" was purged', name)
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_queue(self, name):
        """Delete queue by name

        :param name: name of queue
        :type name: str
        """
        with self.connections[self.connection].acquire() as conn:
            Queue(name=name, channel=conn).delete()
            self.logger.debug('Queue "%s" was deleted', name)
client.py 文件源码 项目:isc 作者: and3rson 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _create_callback_queue(self, channel, exchange):
        name = 'response-{}'.format(uuid.uuid4())
        callback_queue = kombu.Queue(
            name=name,
            exchange=exchange,
            routing_key=name,
            exclusive=True,
            channel=self._channel
        )
        callback_queue.declare()
        return callback_queue
client.py 文件源码 项目:isc 作者: and3rson 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, consumer):
        self.consumer = proxy(consumer)
        super(PublisherThread, self).__init__()
        self.daemon = True
        self._out_queue = Queue()
        self._is_running = False
server.py 文件源码 项目:isc 作者: and3rson 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _create_service_queues(self, services, Consumer, channel):
        """
        Creates necessary AMQP queues, one per service.
        """
        log.debug('Declaring exchange %s', self.exchange)
        exchange = kombu.Exchange(
            self.exchange,
            channel=channel,
            durable=False
        )
        exchange.declare()
        # channel.exchange_declare(exchange=self.exchange)
        queues = []
        for service in services.values():
            queue_name = '{}_service_{}'.format(self.exchange, service.name)
            log.debug('Declaring service queue %s', queue_name)
            queue = kombu.Queue(
                channel=channel,
                name=queue_name,
                exchange=exchange,
                routing_key=queue_name,
                exclusive=False,
                durable=False,
                # channel=channel
            )
            queue.declare()
            queues.append(queue)
            # channel.queue_delete(queue=queue)
            # channel.queue_declare(queue=queue, auto_delete=True)
            # channel.queue_bind(queue, self.exchange)
            # channel.basic_consume(self._on_message, queue=queue, no_ack=False)
        consumer = Consumer(
            # self.connection,
            queues=queues,
            on_message=self._on_message,
            no_ack=False
        )
        # consumer.consume(no_ack=False)
        return consumer
kombu_manager.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _queue(self):
        queue_name = 'flask-socketio.' + str(uuid.uuid4())
        return kombu.Queue(queue_name, self._exchange(),
                           queue_arguments={'x-expires': 300000})
keypair.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_consumers(self, consumer, channel):
        exchange = Exchange(self.nova_exchange, type="topic", durable=False)
        queue = Queue(self.queue_name, exchange, routing_key=self.routing_key,
                      durable=False, auto_delete=True, no_ack=True)
        return [consumer(queue, callbacks=[self.handle_notification])]
bus.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, global_config):
        self._events_pubsub = Pubsub()

        self._bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**global_config['bus'])
        self._exchange = Exchange(global_config['bus']['exchange_name'],
                                  type=global_config['bus']['exchange_type'])
        self._queue = kombu.Queue(exclusive=True)
        self._is_running = False
bus.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def listen_events(self, routing_key, exchange=BUS_EXCHANGE_XIVO):
        with Connection(self._url) as conn:
            queue = Queue(BUS_QUEUE_NAME, exchange=exchange, routing_key=routing_key, channel=conn.channel())
            queue.declare()
            queue.purge()
            self.bus_queue = queue
KombuClient.py 文件源码 项目:PythonSkillTree 作者: w4n9H 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None,
                 queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True):
        self.hosts_conf = hosts_conf
        self.hosts = self.create_hosts()
        self.connection = Connection(self.hosts)
        self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments)
        self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key,
                                  queue_arguments=queue_arguments)]
        self.callback = callback
        self.no_ack = no_ack
KombuClient.py 文件源码 项目:PythonSkillTree 作者: w4n9H 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def queue_size(self, queue_list, queue_arguments=None):
        result = dict()
        for i in queue_list:
            queue_size = self.connection.SimpleQueue(name=Queue(name=i, queue_arguments=queue_arguments)).qsize()
            result[i] = queue_size
        return result
bot.py 文件源码 项目:koslab.messengerbot 作者: koslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def queue_send(self, recipient, message=None, sender_action=None):
        exchange = Exchange(self.send_exchange, 'direct', durable=True)
        queue = Queue(self.send_queue, exchange=exchange, 
                        routing_key=self.send_queue)
        with Connection(self.send_transport) as conn:
            producer = conn.Producer(serializer='json')
            event = {
                'recipient': recipient,
                'message': message,
                'sender_action': sender_action,
                'page_access_token': self.page_access_token
            }
            producer.publish(event, exchange=exchange, 
                                routing_key=queue.routing_key,
                                declare=[queue])
bots.py 文件源码 项目:koslab.messengerbot 作者: koslab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def queue_events(self, events):
        exchange = Exchange(self.exchange, 'direct', durable=True)
        queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
        with Connection(self.transport) as conn:
            producer = conn.Producer(serializer='json')
            for event in events:
                producer.publish(event, exchange=exchange, 
                        routing_key=queue.routing_key,
                        declare=[queue])
kombu_manager.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _queue(self, conn=None):
        exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
        queue = kombu.Queue(str(uuid.uuid4()), exchange)
        return queue
client.py 文件源码 项目:commissaire 作者: projectatomic 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_consumers(self, Consumer, channel):
        """
        Returns a list of kombu.Consumer instances to service all registered
        notification callbacks.

        If using the kombu.mixin.ConsumerMixin mixin class, these instances
        should be included in its get_consumers() method.

        :param Consumer: Message consumer class.
        :type Consumer: class
        :param channel: An open channel.
        :type channel: kombu.transport.*.Channel
        :returns: A list of consumer instances
        :rtype: [kombu.Consumer, ....]
        """
        consumer_list = []
        exchange = self.bus_mixin.producer.exchange
        for routing_key, callbacks in self.notify_callbacks.items():
            queue = kombu.Queue(
                exchange=exchange, routing_key=routing_key)
            consumer = Consumer(
                queues=queue, callbacks=callbacks)
            consumer_list.append(consumer)
            self.bus_mixin.logger.info(
                'Listening for "%s" notifications', routing_key)
        return consumer_list
__init__.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect(self):
        """
        'Connects' to the bus.

        :returns: The same instance.
        :rtype: commissaire_http.bus.Bus
        """
        if self.connection is not None:
            self.logger.warn('Bus already connected.')
            return self

        self.connection = Connection(self.connection_url)
        self._channel = self.connection.channel()
        self._exchange = Exchange(
            self.exchange_name, type='topic').bind(self._channel)
        self._exchange.declare()

        # Create queues
        self._queues = []
        for kwargs in self.qkwargs:
            queue = Queue(**kwargs)
            queue.exchange = self._exchange
            queue = queue.bind(self._channel)
            self._queues.append(queue)
            self.logger.debug('Created queue %s', queue.as_dict())

        # Create producer for publishing on topics
        self.producer = Producer(self._channel, self._exchange)
        self.logger.debug('Bus connection finished')
        return self


问题


面经


文章

微信
公众号

扫码关注公众号