python类Exchange()的实例源码

nova_noticatoin.py 文件源码 项目:Charlie 作者: nxintech 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_consumers(self, consumer, channel):
        exchange = Exchange(exchange_name, type="topic", durable=True)
        queue = Queue(queue_name, exchange, routing_key=routing_key, durable=True)
        return [consumer(queue, callbacks=[self.on_message])]
server.py 文件源码 项目:Sentry 作者: NetEaseGame 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_partitioned_queues(name):
    exchange = Exchange(name, type='direct')
    for num in range(1):
        CELERY_QUEUES.append(Queue(
            '{0}-{1}'.format(name, num),
            exchange=exchange,
        ))
__init__.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_celery_queues():
    global _celery_queues
    if not _celery_queues:
        _celery_queues = [
            Queue(q, Exchange(q), routing_key=q)
            for q in ASSEMBL_CELERY_APPS]
    return _celery_queues
server.py 文件源码 项目:isc 作者: and3rson 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _create_fanout_exchange(self, Consumer, channel):
        """
        Creates a fanout queue to accept notifications.
        """
        # declare_kwargs = dict(
        #     exchange='{}_fanout'.format(self.exchange)
        # )
        # if PIKA_VERSION >= StrictVersion('0.10.0'):
        #     kwarg = 'exchange_type'
        # else:
        #     kwarg = 'type'
        # declare_kwargs[kwarg] = 'fanout'
        exchange_name = '{}_fanout'.format(self.exchange)
        log.debug('Declaring fanout exchange %s', exchange_name)
        exchange = kombu.Exchange(
            name=exchange_name,
            channel=channel,
            durable=False,
            type='fanout'
        )
        exchange.declare()
        queue_name = 'fanout_callback_{}'.format(uuid.uuid4())
        log.debug('Declaring fanout queue %s', queue_name)
        queue = kombu.Queue(
            name=queue_name,
            exchange=exchange,
            exclusive=True,
            durable=False,
            channel=channel
        )
        queue.declare()
        consumer = Consumer(
            # self.connection,
            queues=[queue],
            on_message=self._on_broadcast,
            no_ack=True
            # no_ack=True
        )
        # consumer.consume(no_ack=True)
        # channel.exchange_declare(**declare_kwargs)
        # fanout_queue = channel.queue_declare(exclusive=True)
        # channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue)

        # channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True)
        return consumer
__init__.py 文件源码 项目:commissaire-service 作者: projectatomic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(
            self, exchange_name, connection_url, qkwargs, config_file=None):
        """
        Initializes a new Service instance.

        :param exchange_name: Name of the topic exchange.
        :type exchange_name: str
        :param connection_url: Kombu connection url.
        :type connection_url: str
        :param qkwargs: One or more dicts keyword arguments for queue creation
        :type qkwargs: list
        :param config_file: Path to the configuration file location.
        :type config_file: str or None
        """
        name = self.__class__.__name__
        self.logger = logging.getLogger(name)
        self.logger.debug('Initializing {}'.format(name))

        # If we are given no default, use the global one
        # Read the configuration file
        self._config_data = read_config_file(
            config_file, self._default_config_file)

        if connection_url is None and 'bus_uri' in self._config_data:
            connection_url = self._config_data.get('bus_uri')
            self.logger.debug(
                'Using connection_url=%s from config file', connection_url)
        if exchange_name is None and 'exchange_name' in self._config_data:
            self.logger.debug(
                'Using exchange_name=%s from config file', exchange_name)
            exchange_name = self._config_data.get('bus_exchange')

        self.connection = Connection(connection_url)
        self._channel = self.connection.default_channel
        self._exchange = Exchange(
            exchange_name, type='topic').bind(self._channel)
        self._exchange.declare()

        # Set up queues
        self._queues = []
        for kwargs in qkwargs:
            queue = Queue(**kwargs)
            queue.exchange = self._exchange
            queue = queue.bind(self._channel)
            self._queues.append(queue)
            self.logger.debug(queue.as_dict())

        # Create producer for publishing on topics
        self.producer = Producer(self._channel, self._exchange)
        self.logger.debug('Initializing of {} finished'.format(name))
conftest.py 文件源码 项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def base_app():
    """Flask application fixture without InvenioStats."""
    from invenio_stats.config import STATS_EVENTS
    instance_path = tempfile.mkdtemp()
    app_ = Flask('testapp', instance_path=instance_path)
    stats_events = {'file-download': deepcopy(STATS_EVENTS['file-download'])}
    stats_events.update({'event_{}'.format(idx): {} for idx in range(5)})
    app_.config.update(dict(
        CELERY_ALWAYS_EAGER=True,
        CELERY_TASK_ALWAYS_EAGER=True,
        CELERY_CACHE_BACKEND='memory',
        CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
        CELERY_TASK_EAGER_PROPAGATES=True,
        CELERY_RESULT_BACKEND='cache',
        SQLALCHEMY_DATABASE_URI=os.environ.get(
            'SQLALCHEMY_DATABASE_URI', 'sqlite://'),
        SQLALCHEMY_TRACK_MODIFICATIONS=True,
        TESTING=True,
        OAUTH2SERVER_CLIENT_ID_SALT_LEN=64,
        OAUTH2SERVER_CLIENT_SECRET_SALT_LEN=60,
        OAUTH2SERVER_TOKEN_PERSONAL_SALT_LEN=60,
        STATS_MQ_EXCHANGE=Exchange(
            'test_events',
            type='direct',
            delivery_mode='transient',  # in-memory queue
            durable=True,
        ),
        SECRET_KEY='asecretkey',
        SERVER_NAME='localhost',
        STATS_QUERIES={'bucket-file-download-histogram': {},
                       'bucket-file-download-total': {},
                       'test-query': {},
                       'test-query2': {}},
        STATS_EVENTS=stats_events,
        STATS_AGGREGATIONS={'file-download-agg': {}}
    ))
    FlaskCeleryExt(app_)
    InvenioAccounts(app_)
    InvenioAccountsREST(app_)
    InvenioDB(app_)
    InvenioRecords(app_)
    InvenioFilesREST(app_)
    InvenioPIDStore(app_)
    InvenioQueues(app_)
    InvenioOAuth2Server(app_)
    InvenioOAuth2ServerREST(app_)
    InvenioSearch(app_, entry_point_group=None)
    with app_.app_context():
        yield app_
    shutil.rmtree(instance_path)
orchestrator.py 文件源码 项目:EasyJobLite 作者: treebohotels 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setup_entities(self):
        """
        declare all required entities
        no advanced error handling yet (like error on declaration with altered properties etc)
        """
        # return if already inited
        if self._service_inited:
            return

        # setup exchange
        self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
                                          type='topic',
                                          durable=True)

        # setup durable queues
        self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
                                exchange=self._booking_exchange,
                                routing_key=constants.WORK_QUEUE + ".#",
                                durable=True)

        self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
                                 exchange=self._booking_exchange,
                                 routing_key=constants.RETRY_QUEUE + ".#",
                                 durable=True)

        self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
                               exchange=self._booking_exchange,
                               routing_key=constants.DEAD_LETTER_QUEUE + ".#",
                               durable=True)

        # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
        # this is to handle retry loop which may cause between retry-queue and work-queue.
        # todo: Need to implement an alternive as this has a copy overhead
        #  which can be significant when the error queue is large
        self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
                                  exchange=self._booking_exchange,
                                  routing_key='buffer.#',
                                  durable=True)

        # todo: do we need to make confirm_publish configurable?
        self._conn = Connection(self.get_config().rabbitmq_url,
                                transport_options={'confirm_publish': True})

        # declare all the exchanges and queues needed (declare, not overwrite existing)
        for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
                       self.dlq_queue, self.buffer_queue]:
            entity.maybe_bind(self._conn)
            entity.declare()

        # setup producer to push to error and dlqs
        self._producer = Producer(channel=self._conn.channel(),
                                  exchange=self._booking_exchange)

        self._service_inited = True


问题


面经


文章

微信
公众号

扫码关注公众号