python类Connection()的实例源码

test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def publisher(nameko_config, **kwargs):
    """ Return a function that sends AMQP messages.
    """
    def publish(payload, routing_key, exchange=None):
        """ Dispatch a message with `payload`
        """
        conn = Connection(nameko_config[AMQP_URI_CONFIG_KEY])

        with connections[conn].acquire(block=True) as connection:
            if exchange is not None:  # pragma: no cover
                exchange.maybe_bind(connection)
            with producers[conn].acquire(block=True) as producer:
                producer.publish(
                    payload,
                    routing_key=routing_key,
                    exchange=exchange,
                    **kwargs
                )
    return publish
conftest.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 343 收藏 0 点赞 0 评论 0
def publish_message(rabbit_config):

    def publish(
        exchange, payload, routing_key=None, serializer="json", **kwargs
    ):
        conn = Connection(rabbit_config[AMQP_URI_CONFIG_KEY])

        with connections[conn].acquire(block=True) as connection:
            exchange.maybe_bind(connection)
            with producers[conn].acquire(block=True) as producer:
                producer.publish(
                    payload,
                    exchange=exchange,
                    routing_key=routing_key,
                    serializer=serializer,
                    **kwargs
                )

    return publish
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def deadlettering_exchange(self, rabbit_config, exchange, queue):
        conn = Connection(rabbit_config[AMQP_URI_CONFIG_KEY])

        with connections[conn].acquire(block=True) as connection:

            deadletter_exchange = Exchange(name="deadletter", type="topic")
            deadletter_exchange.maybe_bind(connection)
            deadletter_exchange.declare()

            deadletter_queue = Queue(
                name="deadletter",
                exchange=deadletter_exchange,
                routing_key="#",
                queue_arguments={
                    'x-dead-letter-exchange': exchange.name
                }
            )
            deadletter_queue.maybe_bind(connection)
            deadletter_queue.declare()

        return deadletter_exchange
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_connection(self, connection):
        """Create connection strategy

        :param connection: connection for broker
        :type connection: str, None, kombu.connections.Connection, dict
        :return: instance of kombu.connections.Connection
        :rtype: Connection
        """

        if not connection:
            connection = self.default_connection  # pragma: no cover

        if isinstance(connection, str):
            connection = {'hostname': connection}

        if isinstance(connection, dict):
            connection = Connection(**connection)

        return connection
service.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_connection(self, connection):
        """Create connection strategy

        :param connection: connection for broker
        :type connection: str, None, kombu.connections.Connection, dict
        :return: instance of kombu.connections.Connection
        :rtype: Connection
        """
        if not connection:
            connection = self.connection  # pragma: no cover

        if isinstance(connection, str):
            connection = {'hostname': connection}

        if isinstance(connection, dict):
            connection = Connection(**connection)

        return connection
client.py 文件源码 项目:isc 作者: and3rson 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _connect(self):
        self._conn = kombu.Connection(
            self._hostname,
            connect_timeout=self._connect_timeout
        )

        self._channel = self._conn.channel()

        self._exchange = kombu.Exchange(
            name=self._exchange_name,
            channel=self._channel,
            durable=False
        )

        self._callback_queue = self._create_callback_queue(
            self._channel,
            self._exchange
        )
kombu_queue.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, name, url="amqp://", maxsize=0, lazy_limit=True):
        """
        Constructor for KombuQueue

        url:        http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
        maxsize:    an integer that sets the upperbound limit on the number of
                    items that can be placed in the queue.
        """
        self.name = name
        self.conn = Connection(url)
        self.queue = self.conn.SimpleQueue(self.name, no_ack=True, serializer='umsgpack')

        self.maxsize = maxsize
        self.lazy_limit = lazy_limit
        if self.lazy_limit and self.maxsize:
            self.qsize_diff_limit = int(self.maxsize * 0.1)
        else:
            self.qsize_diff_limit = 0
        self.qsize_diff = 0
__init__.py 文件源码 项目:commissaire-service 作者: projectatomic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_consume_ready(
            self, connection, channel, consumers):  # pragma: no cover
        """
        Called when the service is ready to consume messages.

        :param connection: The current connection instance.
        :type connection: kombu.Connection
        :param channel: The current channel.
        :type channel: kombu.transport.*.Channel
        :param consumers: A list of consumers.
        :type consumers: list
        """
        self.logger.info('Ready to consume')
        if self.logger.level == logging.DEBUG:
            queue_names = []
            for consumer in consumers:
                queue_names += [x.name for x in consumer.queues]
            self.logger.debug(
                'Consuming via connection "{}" and channel "{}" on '
                'the following queues: "{}"'.format(
                    connection.as_uri(), channel, '", "'.join(queue_names)))
bots.py 文件源码 项目:koslab.messengerbot 作者: koslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def consume(self):
        def worker(event, message):
            page_id = event['recipient']['id']
            bot_class, bot_args = self.get_bot(page_id)
            p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args, 
                        self.transport, self.send_exchange, 
                        self.send_queue, event, message))
            p.start()
            def stop_worker(signum, frame):
                p.terminate()
                p.join()
            signal.signal(signal.SIGTERM, stop_worker)

        exchange = Exchange(self.exchange, 'direct', durable=True)
        queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
        with Connection(self.transport) as conn:
            with conn.Consumer(queue, callbacks=[worker]) as consumer:
                while True:
                    conn.drain_events()
bots.py 文件源码 项目:koslab.messengerbot 作者: koslab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def sender(self):
        def worker(event, message):
            p = Process(target=spawn_send_message_worker, args=(event, message))
            p.start()
            def stop_worker(signum, frame):
                p.terminate()
                p.join()
            signal.signal(signal.SIGTERM, stop_worker)


        exchange = Exchange(self.send_exchange, 'direct', durable=True)
        queue = Queue(self.send_queue, exchange=exchange, 
                        routing_key=self.send_queue)
        with Connection(self.send_transport) as conn:
            with conn.Consumer(queue, callbacks=[worker]) as consumer:
                while True:
                    conn.drain_events()
conftest.py 文件源码 项目:nameko-multi-region-example 作者: kooba 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def publish(config):
    conn = Connection(config[AMQP_URI_CONFIG_KEY])

    def publish(payload, routing_key, exchange=None, **kwargs):
        """Publish an AMQP message."""
        with kombu_connections[conn].acquire(block=True) as connection:
            if exchange is not None:
                exchange.maybe_bind(connection)
            with producers[conn].acquire(block=True) as producer:
                producer.publish(
                    payload,
                    exchange=exchange,
                    serializer='json',
                    routing_key=routing_key,
                    **kwargs)

    return publish
pulse.py 文件源码 项目:jx-sqlite 作者: mozilla 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
        if not self.connection:
            self.connection = Connection(
                hostname=self.settings.host,
                port=self.settings.port,
                userid=self.settings.user,
                password=self.settings.password,
                virtual_host=self.settings.vhost,
                ssl=self.settings.ssl
            )
backoff.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def republish(self, backoff_exc, message, target_queue):

        expiration = backoff_exc.next(message, self.exchange.name)
        queue = self.make_queue(expiration)

        # republish to appropriate backoff queue
        amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
        with get_producer(amqp_uri) as producer:

            properties = message.properties.copy()
            headers = properties.pop('application_headers')

            headers['backoff'] = expiration
            expiration_seconds = float(expiration) / 1000

            # force redeclaration; the publisher will skip declaration if
            # the entity has previously been declared by the same connection
            conn = Connection(amqp_uri)
            maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY)

            producer.publish(
                message.body,
                headers=headers,
                exchange=self.exchange,
                routing_key=target_queue,
                expiration=expiration_seconds,
                retry=True,
                retry_policy=DEFAULT_RETRY_POLICY,
                declare=[queue.exchange, queue],
                **properties
            )
test_weibo_harvester.py 文件源码 项目:sfm-weibo-harvester 作者: gwu-libraries 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _create_connection():
        return Connection(hostname="mq", userid=tests.mq_username, password=tests.mq_password)
rabbit.py 文件源码 项目:fuel-ccp-tests 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, ip, port, user='rabbitmq', password='password'):
        c = kombu.Connection("amqp://{0}:{1}@{2}:{3}//".format(user, password,
                                                               ip, port))
        c.connect()
        self.ch = c.channel()
client.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, connection='amqp:///', name=None, logger=None, limit=None):
        """Initialization of Client instance

        :param connection: connection for broker
        :type connection: str, None, kombu.connections.Connection, dict
        """

        self.connection = self._get_connection(connection)
        self.exchanges = {}

        if name is None:
            try:
                name = '<client: {}>'.format(self.connection.as_uri())
            except:  # pragma: no cover
                # Errors with filesystem transport
                name = '<client: {}>'.format(self.connection.transport_cls)

        if logger is None:
            logger = get_logger(__name__)

        self.logger = InstanceLogger(self, logger)

        self.name = name
        self.logger.debug('%s built', self.name)

        if limit is None:
            # Set limit as global kombu limit.
            limit = pools.get_limit()
        self.limit = limit
        self.connections = pools.Connections(self.limit)
service.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, connection='amqp:///', logger=None, timeout=10, name=None):
        """Initialization

        :param connection: connection for queues broker
        :type connection: str, None, dict or Connection
        :param logger: logging instance
        :type logger: Logger
        :param timeout: sleeping for loop, default = 0.1
        :type timeout: None, int or float
        """
        if logger is None:
            logger = _logger

        self.logger = InstanceLogger(self, logger)
        self.connection = self._get_connection(connection)
        self.timeout = timeout
        self.consumers = []

        if name is None:
            try:
                name = '<microservice: {}>'.format(self.connection.as_uri())
            except:  # pragma no cover
                name = '<microservice: {}>'.format(
                    self.connection.transport_cls)  # pragma: no cover

        self.name = name
        self._stop = False
        self._stopped = False
service.py 文件源码 项目:microservices 作者: viatoriche 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def drain_events(self, infinity=True):

        with nested(*self.consumers):
            while not self._stop:
                try:
                    self.connection.drain_events(timeout=self.timeout)
                except socket.timeout:
                    if not infinity:
                        break
                except Exception as e:
                    if not self.connection.connected and not self._stop:
                        self.logger.error(
                            'Connection to mq has broken off. Try to reconnect')
                        self.connect()
                        self.revive()
                        break
                    elif not self._stop and not isinstance(e, HandlerError):
                        self.logger.exception(
                            'Something wrong! Try to restart the loop')
                        self.revive()
                        break
                    elif isinstance(e, HandlerError):
                        pass
                    else:  # pragma: no cover
                        self.logger.exception('Unknown error')  # pragma: no cover
        if self._stop:
            self._stopped = True
            self.logger.info('Stopped draining events.')
kombu_manager.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _connection(self):
        return kombu.Connection(self.url)
__init__.py 文件源码 项目:commissaire-service 作者: projectatomic 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def onconnection_revived(self):  # pragma: no cover
        """
        Called when a reconnection occurs.
        """
        self.logger.info('Connection (re)established')
__init__.py 文件源码 项目:commissaire-service 作者: projectatomic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def on_consume_end(self, connection, channel):  # pragma: no cover
        """
        Called when the service stops consuming.

        :param connection: The current connection instance.
        :type connection: kombu.Connection
        :param channel: The current channel.
        :type channel: kombu.transport.*.Channel
        """
        self.logger.warn('Consuming has ended')
keypair.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, aws_connection, transport='amqp'):
        self.ec2_conn = aws_connection
        self.broker_uri = \
            "{transport}://{username}:{password}@{rabbit_host}:{rabbit_port}"\
            .format(transport=transport,
                    username=CONF.rabbit_userid,
                    password=CONF.rabbit_password,
                    rabbit_host=CONF.rabbit_host,
                    rabbit_port=CONF.rabbit_port)
        self.connection = Connection(self.broker_uri)
messaging.py 文件源码 项目:daenerys 作者: dongweiming 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sync_get(name, interval=0.5):
    with Connection(BROKER_URI) as conn:
        publish(conn, name)
        while 1:
            rs = r.get(name)
            if rs and Backend.from_json(cPickle.loads(rs)).status == SUCCESS:
                break
            time.sleep(interval)
        item = Backend.get(name)
        return item.result
worker.py 文件源码 项目:daenerys 作者: dongweiming 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    disconnect()
    connect('zhihulive')
    with Connection(BROKER_URI) as conn:
        consumer(conn, [process_task])
tasks_remove.py 文件源码 项目:enteletaor 作者: cr0hn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def action_proc_remove(config):

    log.warning("  - Trying to connect with server...")

    url = '%s://%s' % (config.broker_type, config.target)

    with Connection(url) as conn:
        in_queue = conn.SimpleQueue('celery')

        # Get remote process
        for _ in get_remote_messages(config, in_queue, False):
            pass

        log.error("   - All tasks removed from '%s'" % config.target)
bus.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _make_publisher(self):
        bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
        bus_connection = Connection(bus_url)
        bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'])
        bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
        bus_marshaler = Marshaler(self._uuid)
        return Publisher(bus_producer, bus_marshaler)
bus.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        logger.info("Running AMQP consumer")
        with Connection(self._bus_url) as connection:
            self.connection = connection

            super(CoreBusConsumer, self).run()
collectd.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _make_publisher(self):
        bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
        bus_connection = Connection(bus_url)
        same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False}
        bus_exchange = Exchange(self.config['exchange_name'],
                                type=self.config['exchange_type'],
                                **same_exchange_arguments_as_collectd)
        bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
        bus_marshaler = CollectdMarshaler(self._uuid)
        return Publisher(bus_producer, bus_marshaler)
bus.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def listen_events(self, routing_key, exchange=BUS_EXCHANGE_XIVO):
        with Connection(self._url) as conn:
            queue = Queue(BUS_QUEUE_NAME, exchange=exchange, routing_key=routing_key, channel=conn.channel())
            queue.declare()
            queue.purge()
            self.bus_queue = queue
bus.py 文件源码 项目:xivo-ctid-ng 作者: wazo-pbx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _drain_events(self, on_event):
        if not hasattr(self, 'bus_queue'):
            raise Exception('You must listen for events before consuming them')
        with Connection(self._url) as conn:
            with Consumer(conn, self.bus_queue, callbacks=[on_event]):
                try:
                    while True:
                        conn.drain_events(timeout=0.5)
                except TimeoutError:
                    pass


问题


面经


文章

微信
公众号

扫码关注公众号