python类BlockingConnection()的实例源码

rabbitmq.py 文件源码 项目:gemstone 作者: vladcalin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host="127.0.0.1", port=5672, username="", password="", **connection_options):
        """
        Event transport via RabbitMQ server.

        :param host: ipv4 or hostname
        :param port: the port where the server listens
        :param username: username used for authentication
        :param password: password used for authentication
        :param connection_options: extra arguments that will be used in
                                   :py:class:`pika.BlockingConnection` initialization.
        """
        if not pika:
            raise RuntimeError("RabbitMqEventTransport requires 'pika' to run")

        super(RabbitMqEventTransport, self).__init__()
        self._handlers = {}

        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=host, port=port,
                credentials=pika.PlainCredentials(username=username, password=password),
                **connection_options
            )
        )
        self.channel = self.connection.channel()
queue.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_connection_amqp():
    try:
        port = int(config.get('ckan.harvest.mq.port', PORT))
    except ValueError:
        port = PORT
    userid = config.get('ckan.harvest.mq.user_id', USERID)
    password = config.get('ckan.harvest.mq.password', PASSWORD)
    hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
    virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)

    credentials = pika.PlainCredentials(userid, password)
    parameters = pika.ConnectionParameters(host=hostname,
                                           port=port,
                                           virtual_host=virtual_host,
                                           credentials=credentials,
                                           frame_max=10000)
    log.debug("pika connection using %s" % parameters.__dict__)

    return pika.BlockingConnection(parameters)
receive.py 文件源码 项目:DIS_MeituanReptile 作者: myvary 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def receive(self,callback, username, pwd, ip, port):
        '''
        # ????????????????????????
        :param callback: ????
        :param username: ??RabbitMQ???????
        :param pwd:      ??
        :param ip:      ip
        :param port:    ??
        :return:        
        '''
        user_pwd = pika.PlainCredentials(username, pwd)
        s_conn = pika.BlockingConnection(pika.ConnectionParameters(ip, port, '/', credentials=user_pwd))
        channel = s_conn.channel()
        channel.queue_declare(queue='city_task_queue', durable=True)
        channel.basic_qos(prefetch_count=1)
        print '???????????'
        channel.basic_consume(callback,
                              queue='city_task_queue',
                              )
        channel.start_consuming()
queue.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _check_analysis_queue(queue_name, thread_id=0):
        """
        Private static method whose create the queue_name queue as singleton
        """
        # check if connection exists for the thread
        if thread_id not in Queue.connections:
            try:
                Queue.connections[thread_id] = pika.BlockingConnection(
                    pika.ConnectionParameters(Queue.host))
            except pika.exceptions.ConnectionClosed as e:
                logging.error("Error with RMQ server, check it's started.")
                os._exit(1)
            Queue.consumers[thread_id] = True
        # check if channel exists for the thread
        if queue_name not in Queue.channels\
                or Queue.channels[queue_name].is_closed:
            Queue.channels[queue_name] = Queue.connections[thread_id].channel()
            Queue.channels[queue_name].queue_declare(queue=queue_name)
threadPollSubs.py 文件源码 项目:djangoStatusPanel 作者: okar1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getMqConf(mqConfig,serverName,vServerErrors):
    pollName = "CheckRabbitMq"
    errors = set()

    # mqAmqpConnection if first working mq was found and res is corresponding mqConf
    res=None
    for mqConf in mqConfig:
        try:
            con = pika.BlockingConnection(pika.URLParameters(mqConf['amqpUrl']))
        except Exception as e:
            errors.add(str(e))
        else:
            if res is None:
                # use first working connection in later tasks
                res=mqConf
            #close amqplink
            if con.is_open:
                con.close()
    # endfor

    vServerErrors.update(formatErrors(errors, serverName, pollName))
    return res
test_instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_create_success(self):
        req = dict(
            container_name=test_cfg.CONTAINER_NAME,
            image_id=test_cfg.CONTAINER_IMAGE_ID,
            user_name=test_cfg.USER_NAME
        )
        with mock.patch('pika.BlockingConnection') as mock_block,\
            mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
            mock.patch.object(UiQueue, 'send') as mock_queue_send:
            worker_res = dict(
                code='0x1',
                message='pass',
                ins={}
            )
            mock_block.return_value = mock.Mock()
            mock_block_channel.return_value = mock.Mock()
            mock_queue_send.return_value = json.dumps(worker_res)
            response = self.app.post('/create_ins', data = json.dumps(req), follow_redirects=True)
            mock_queue_send.assert_called_once()
            res_dict = json.loads(response.data)
            self.assertEqual(res_dict.get('code'), '0x1')

    # Container name occupied by others
test_instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_stop_success(self):
        create_instance(self.ins)
        req = dict(
            container_serial=self.ins.get('container_serial'),
            user_name=test_cfg.USER_NAME
        )
        with mock.patch('pika.BlockingConnection') as mock_block,\
            mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
            mock.patch.object(UiQueue, 'send') as mock_queue_send:
            worker_res = dict(
                code='0x1',
                message='pass',
                container_serial=self.ins.get('container_serial')
            )
            mock_block.return_value = mock.Mock()
            mock_block_channel.return_value = mock.Mock()
            mock_queue_send.return_value = json.dumps(worker_res)
            response = self.app.post('/stop_ins', data=json.dumps(req), follow_redirects=True)
            remove_instance_by_serial(self.ins.get('container_serial'))
            mock_queue_send.assert_called_once()
            res_dict = json.loads(response.data)
            self.assertEqual(res_dict.get('code'), '0x1')

    # Container is not exist
test_instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_restart_success(self):
        create_instance(self.ins)
        req = dict(
            container_serial=self.ins.get('container_serial'),
            user_name=test_cfg.USER_NAME
        )
        with mock.patch('pika.BlockingConnection') as mock_block,\
            mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
            mock.patch.object(UiQueue, 'send') as mock_queue_send:
            worker_res = dict(
                code='0x1',
                message='pass',
                container_serial=self.ins.get('container_serial')
            )
            mock_block.return_value = mock.Mock()
            mock_block_channel.return_value = mock.Mock()
            mock_queue_send.return_value = json.dumps(worker_res)
            response = self.app.post('/restart_ins', data=json.dumps(req), follow_redirects=True)
            remove_instance_by_serial(self.ins.get('container_serial'))
            mock_queue_send.assert_called_once()
            res_dict = json.loads(response.data)
            self.assertEqual(res_dict.get('code'), '0x1')

    # Container is not exist
test_instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_restart_failed(self):
        req = dict(
            container_serial=self.ins.get('container_serial'),
            user_name=test_cfg.USER_NAME
        )
        with mock.patch('pika.BlockingConnection') as mock_block,\
            mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
            mock.patch.object(UiQueue, 'send') as mock_queue_send:
            worker_res = dict(
                code='0x1',
                message='pass',
                container_serial=self.ins.get('container_serial')
            )
            mock_block.return_value = mock.Mock()
            mock_block_channel.return_value = mock.Mock()
            mock_queue_send.return_value = json.dumps(worker_res)
            response = self.app.post('/restart_ins', data=json.dumps(req), follow_redirects=True)
            res_dict = json.loads(response.data)
            self.assertEqual(res_dict.get('code'), '0x9')
test_instance.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_remove_success(self):
        create_instance(self.ins)
        req = dict(
            container_serial=self.ins.get('container_serial'),
            user_name=test_cfg.USER_NAME
        )
        with mock.patch('pika.BlockingConnection') as mock_block,\
            mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
            mock.patch.object(UiQueue, 'send') as mock_queue_send:
            worker_res = dict(
                code='0x1',
                message='pass',
                container_serial=self.ins.get('container_serial')
            )
            mock_block.return_value = mock.Mock()
            mock_block_channel.return_value = mock.Mock()
            mock_queue_send.return_value = json.dumps(worker_res)
            response = self.app.post('/remove_ins', data=json.dumps(req), follow_redirects=True)
            remove_instance_by_serial(self.ins.get('container_serial'))
            mock_queue_send.assert_called_once()
            res_dict = json.loads(response.data)
            self.assertEqual(res_dict.get('code'), '0x1')
__init__.py 文件源码 项目:Bitcoin-Crypto-python-charts 作者: Whalepool 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def teamspeak(self, msg):

        # Connect to rabbitmq
        parameters = pika.URLParameters('amqp://'+self.RMQUSER+':'+self.RMQPASS+'@'+self.RMQHOST+':'+self.RMQPORT+'/'+self.RMQVHOST+'?socket_timeout='+self.RMQSOCKETTIMEOUT)
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        channel.queue_declare(queue='teamspeak')

        if isinstance(msg, list):
            for m in msg:
                logger.info("Sending msg to teamspeak: "+str(m))
                channel.basic_publish(exchange='',routing_key='teamspeak',body=json.dumps(m))

        else: 
            logger.info("Sending msg to teamspeak: "+str(msg))
            channel.basic_publish(exchange='',routing_key='teamspeak',body=json.dumps(msg))

        connection.close()
rabbitmq.py 文件源码 项目:easy-job 作者: inb-co 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, queue_name, serializer, rabbitmq_configs, *args, **kwargs):
        self.queue_name = queue_name
        self.serialize = serializer
        super(RabbitMQRunner, self).__init__(*args, **kwargs)
        self.log(logging.DEBUG, "RabbitMQ Runner is ready...")

        def _create_pool():
            connection_pool_configs = rabbitmq_configs.get('connection_pool_configs', {})

            def create_connection():
                self.log(logging.DEBUG, "Creating new rabbitmq connection")
                con_params = pika.ConnectionParameters(**rabbitmq_configs.get('connection_parameters', {}))
                return pika.BlockingConnection(con_params)

            return pika_pool.QueuedPool(
                create=create_connection,
                **connection_pool_configs
            )

        self._pool = SimpleLazyObject(_create_pool)
correlationid_center.py 文件源码 项目:Python_Study 作者: thsheep 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()

        #???????????
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response,
                                   no_ack=True,
                                   queue=self.callback_queue)

        #??????????????
        self.response = {}

    #??????????????
stress_mq.py 文件源码 项目:gremlin 作者: unitedstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def publish():
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()  

    channel.exchange_declare(exchange=args.rabbit_exchange,
                             durable=str2bool(args.exchange_durable),
                             auto_delete=str2bool(args.exchange_auto_delete),
                             type="topic")
    channel.queue_declare(queue=args.rabbit_queue,
                          durable=str2bool(args.queue_durable),
                          auto_delete=str2bool(args.queue_auto_delete))
    channel.queue_bind(args.rabbit_queue, args.rabbit_exchange,
                       args.routing_key)

    message = 'Gremlin Coming!'
    count = 0
    while count < args.msg_per_thread:
        channel.basic_publish(exchange=args.rabbit_exchange,
                              routing_key=args.routing_key,
                              body=message)
        count = count + 1
    connection.close()
ftpscout.py 文件源码 项目:ftpscout 作者: RubenRocha 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def mq_worker():
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
        channel = connection.channel()

        channel.queue_declare(queue=mq_queue, durable=True)

        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback,
                            queue=mq_queue)

        channel.start_consuming()
    except Exception as e:
        connection.close()

        mq_worker()
queue-consumer.py 文件源码 项目:SkySpyWatch 作者: nstarpost 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def consume():
    """Creates mongo, redis, and rabbitmq connections; consumes queue."""
    logger.debug("Consume started")
    redis_host = 'localhost'
    redis_port = 6379
    # connect to mongodb
    client = MongoClient()
    dbmongo = client.rt_flights_test
    # connect to redis
    r = redis.StrictRedis(host=redis_host, port=redis_port, db=0, decode_responses=True)
    # connect to rabbitmq and create queue
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    task_queue = channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)
    # start pulling data off the queue
    channel.basic_consume(lambda ch, method, properties, body: callback(ch, method, properties, body, r, dbmongo), queue=queue_name)
    channel.start_consuming()
    client.close()
    return 0
main.py 文件源码 项目:Malicious_Domain_Whois 作者: h-j-13 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dispatch_thread(self):
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            Static.RABBITMQ_HOST))
        channel = connection.channel()
        # ??????
        domain_queue = self.get_domain()
        while domain_queue.qsize() > 0:
            while domain_queue.qsize() > 0:
                now_count = channel.queue_declare(queue='domain_queue', durable=True).method.message_count
                #  ???????
                if now_count < QUEUE_LENGTH_MIN:
                    #  ????????????????????
                    while now_count < QUEUE_LENGTH_MAX:
                        if domain_queue.qsize() >= 100:
                            self.domain_push(domain_queue.get(), True)
                        else:
                            domain_queue = self.get_domain()
                        now_count = channel.queue_declare(queue='domain_queue', durable=True).method.message_count
                    #  ?????????????
                    time.sleep(WAIT_INTERVAL)

                # ?????????????????????
                time.sleep(MONITOR_INTERVAL)
                if domain_queue.qsize() < 100:
                    domain_queue = self.get_domain()
PikaClient.py 文件源码 项目:PythonSkillTree 作者: w4n9H 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, host, port=5672, username='guest', password='guest', prefetch_count=1):
        """
        ???rmq?
        :param host:
        :param port:
        :param username:
        :param password:
        :param prefetch_count:
        :return:
        """
        self.queue_host = host
        self.queue_port = port
        self.credentials = pika.credentials.PlainCredentials(username=username, password=password)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.queue_host,
                                                                            port=self.queue_port,
                                                                            credentials=self.credentials))
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=prefetch_count)
test_status.py 文件源码 项目:plumpy 作者: aiidateam 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestStatusRequesterAndProvider, self).setUp()

        self.response = None

        # Set up communications
        try:
            self._connection = pika.BlockingConnection()
        except pika.exceptions.ConnectionClosed:
            self.fail("Couldn't open connection.  Make sure rmq server is running")

        exchange = "{}.{}.status_request".format(self.__class__, uuid.uuid4())
        self.requester = StatusRequester(self._connection, exchange=exchange)
        self.manager = ProcessManager()
        self.provider = StatusProvider(
            self._connection, process_manager=self.manager, exchange=exchange)
test_rmq.py 文件源码 项目:plumpy 作者: aiidateam 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestProcessController, self).setUp()
        try:
            self._connection = pika.BlockingConnection()
        except pika.exceptions.ConnectionClosed:
            self.fail("Couldn't open connection.  Make sure rmq server is running")

        self.exchange = '{}.{}.task_control'.format(
            self.__class__, uuid.uuid4())

        self.channel = self._connection.channel()
        self.channel.exchange_declare(exchange=self.exchange, type='fanout')

        self.manager = ProcessManager()
        self.controller = ProcessController(
            self._connection, exchange=self.exchange,
            process_manager=self.manager)
sender.py 文件源码 项目:EvalAI 作者: Cloud-CV 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def publish_submission_message(challenge_id, phase_id, submission_id):

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='evalai_submissions', type='topic')

    # though worker is creating the queue(queue creation is idempotent too)
    # but lets create the queue here again, so that messages dont get missed
    # later on we can apply a check on queue message length to raise some alert
    # this way we will be notified of worker being up or not
    channel.queue_declare(queue='submission_task_queue', durable=True)

    message = {
        'challenge_id': challenge_id,
        'phase_id': phase_id,
        'submission_id': submission_id
    }
    channel.basic_publish(exchange='evalai_submissions',
                          routing_key='submission.*.*',
                          body=json.dumps(message),
                          properties=pika.BasicProperties(delivery_mode=2))    # make message persistent

    print(" [x] Sent %r" % message)
    connection.close()
rabbitmq.py 文件源码 项目:flask-monitor 作者: fraoustin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, 
                       exchange='flask',
                       routing_key='',
                       *args,
                       **kw):
        kw_mq = { key : kw[key] for key in kw if key in self.args_mq}
        kw = { key : kw[key] for key in kw if key not in self.args_mq}
        ObserverMetrics.__init__(self, *args, **kw)
        try:

            connection = pika.BlockingConnection(pika.ConnectionParameters(**kw_mq))
            self.channel = connection.channel()
            self.exchange = exchange
            self.routing_key = routing_key
            try:
                self.channel.exchange_declare(exchange=exchange,
                                               type='fanout')
                self.logger.debug("Create channel RabbitMq '%s'" % exchange)
            except:
                self.logger.debug("Not create channel RabbitMq '%s'" % exchange)
        except Exception as e:
            self.logger.critical("Cannot connect to RabbitMq '%s'" % str(e))
service.py 文件源码 项目:story_engine 作者: brettkromkamp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def put_interaction(topic_map_identifier, json_body):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='storytechnologies', durable=True)

    # Example messages:
    #   {"type": "scene", "command": "toggle-rotation"}
    #   {"type": "scene", "command": "navigate-to", "sceneIdentifier": "outpost"}
    #   {"type": "animation", "command": "toggle-animation", "animate": "true", "objectIdentifier": "windmill"}

    channel.basic_publish(exchange='',
                          routing_key='storytechnologies',
                          body=json.dumps(json_body))
    connection.close()

# POST /scenes
# POST /scenes/{identifier}/assets
# POST /scenes/{identifier}/attributes
# POST /paths
# POST /characters
# POST /characters/{identifier}/assets
# POST /props
# POST /props/{identifier}/assets
sub_monitor.py 文件源码 项目:SolutionGambling 作者: eganwall 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def publish_message(queue_name, message):
    message_body = json.dumps(message)
    message_id = message['message_id']

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message_body,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
    )
    log_msg = "Published : [queue_name={}] [comment_id={}] [username={}] [comment_body={}]".format(
        queue_name, message['comment_id'], message['username'], message['comment_body'])
    logger.log_info_message(message_id, LogUtilityConstants.message_published_event,
                            'sub_monitor', log_msg)

    connection.close()
wisp_monitor.py 文件源码 项目:rune 作者: hoonkim 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, call_queue_name="wisp"):
        self._call_queue_name = call_queue_name
        self._connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self._publish_channel = self._connection.channel()

        self._publish_channel.queue_declare(queue=self._call_queue_name)
        self._receive_channel = self._connection.channel()

        result = self._receive_channel.queue_declare(exclusive=True)
        self._receive_queue_name = result.method.queue

        self._consuming_thread = threading.Thread(target=self.start_consuming,
                                                  args=(self._receive_channel,
                                                        self._receive_queue_name,
                                                        self.on_response))
        self._consuming_thread.start()
queue.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, host='localhost', port=5672):
        self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
        self._channel = self._connection.channel()
queue.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, exchangeName, host='localhost', port=5672):
        self._exchangeName = exchangeName
        self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
        self._channel = self._connection.channel()
        self._channel.exchange_declare(exchange=exchangeName, type='direct')
queue.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, queueName, host='localhost', port=5672):
        self._qname = queueName
        self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
        self._channel = self._connection.channel()
        self._channel.queue_declare(queue=queueName, durable=True)
        self._exchange = ''
utils.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, queueName):
        self._qname = queueName
        self._connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self._channel = self._connection.channel()
        self._channel.queue_declare(queue=queueName, durable=True)
websocket.py 文件源码 项目:django-notifs 作者: danidee10 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def application(env, start_response):
    """Setup the Websocket Server and read messages off the queue."""
    connection = pika.BlockingConnection(
        pika.connection.URLParameters(RABBIT_MQ_URL)
    )
    channel = connection.channel()

    queue = env['PATH_INFO'].replace('/', '')

    channel.queue_declare(queue=queue)

    uwsgi.websocket_handshake(
        env['HTTP_SEC_WEBSOCKET_KEY'],
        env.get('HTTP_ORIGIN', '')
    )

    def keepalive():
        """Keep the websocket connection alive (called each minute)."""
        print('PING/PONG...')
        try:
            uwsgi.websocket_recv_nb()
            connection.add_timeout(60, keepalive)
        except OSError as error:
            print('Killing the connection...')
            sys.exit(0)

    def callback(ch, method, properties, body):
        """Callback called when a message has been received."""
        try:
            uwsgi.websocket_send(body)
        except OSError as error:
            print('Could not send message over the websocket', error)
            sys.exit(0)

    keepalive()
    channel.basic_consume(callback, queue=queue, no_ack=True)
    channel.start_consuming()


问题


面经


文章

微信
公众号

扫码关注公众号