python类BasicProperties()的实例源码

spark_bot.py 文件源码 项目:roomfinder 作者: GuillaumeMorini 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_message_to_queue(message):
    global corr_id
    global response
    global connection
    global channel
    global callback_queue

    response=None
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="37.187.22.103",port=2765,heartbeat_interval=30))  
    channel = connection.channel()
    result=channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    channel.basic_consume(on_response, no_ack=True,
                                   queue=callback_queue)
    corr_id=str(uuid.uuid4())

    response = None
    corr_id =  str(uuid.uuid4())
    channel.basic_publish(  exchange='',
                            routing_key="rpc_queue",
                            properties=pika.BasicProperties(
                                         reply_to = callback_queue,
                                         correlation_id = corr_id),
                            body=message)

    print(" [x] Sent data to RabbitMQ")   

    while response is None:
        connection.process_data_events()
    print(" [x] Get response from RabbitMQ")   
    return str(response)
dispo.py 文件源码 项目:roomfinder 作者: GuillaumeMorini 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def send_message_to_queue(message):
    global corr_id
    global response
    global connection
    global channel
    global callback_queue

    response=None
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq,port=int(rabbitmq_port),heartbeat_interval=30))  
    channel = connection.channel()
    result=channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    channel.basic_consume(on_response, no_ack=True,
                                   queue=callback_queue)
    corr_id=str(uuid.uuid4())

    response = None
    corr_id =  str(uuid.uuid4())
    channel.basic_publish(  exchange='',
                            routing_key="rpc_queue",
                            properties=pika.BasicProperties(
                                         reply_to = callback_queue,
                                         correlation_id = corr_id),
                            body=message)

    print(" [x] Sent data to RabbitMQ")   

    while response is None:
        connection.process_data_events()
    print(" [x] Get response from RabbitMQ")   
    print "response: "+str(response)
    return response
mq.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def on_request(self, ch, method, props, body):
        response = self.receive(body)
        ch.basic_publish(exchange='',
                routing_key=props.reply_to,
                properties=pika.BasicProperties(correlation_id = \
                        props.correlation_id),
                body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
rpc_client.py 文件源码 项目:core-python 作者: yidao620c 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
rpc_server.py 文件源码 项目:core-python 作者: yidao620c 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
correlationid_compute.py 文件源码 项目:Python_Study 作者: thsheep 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def request(ch, method, props, body):
    print(" [.] increase(%s)"  % (body,))

    response = increase(int(body))

    #???????????????correlation_id???
    ch.basic_publish(exchange='',
              routing_key=props.reply_to,
              properties=pika.BasicProperties(correlation_id = \
                                          props.correlation_id),
                         body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
rpc_center.py 文件源码 项目:Python_Study 作者: thsheep 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def request(self, n):
        self.response = None
        #??????????????
        self.channel.basic_publish(exchange='',
                             routing_key='compute_queue',
                             properties=pika.BasicProperties(
                               reply_to =self.callback_queue,
                                         ),
                             body=str(n))
        #???????
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
myrabbit.py 文件源码 项目:myRabbit 作者: bsab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def publish_message(self):
        """If the class is not stopping, publish a message to RabbitMQ,
        appending a list of deliveries with the message number that was sent.
        This list will be used to check for delivery confirmations in the
        on_delivery_confirmations method.

        Once the message has been sent, schedule another message to be sent.
        The main reason I put scheduling in was just so you can get a good idea
        of how the process is flowing by slowing down and speeding up the
        delivery intervals by changing the PUBLISH_INTERVAL constant in the
        class.

        """
        if self._stopping:
            return

        # controllo che il servizio di acquisizione sia attivo ...
        # if not self._winservice.isRunning():
        #    LOGGER.info('Win Service is not running...')
        #    print 'Win Service is not running...'
        #    return

        message = self.get_message_from_selected_data();
        #print "***************************************"
        #print json.dumps(message, ensure_ascii=False)
        #print "***************************************"

        properties = pika.BasicProperties(app_id='myrabbit_py-publisher',
                                          content_type='application/json')

        self._channel.basic_publish(self.EXCHANGE,
                                    self.ROUTING_KEY,
                                    json.dumps(message, ensure_ascii=False),
                                    properties)

        self._message_number += 1
        self._deliveries.append(self._message_number)
        logger.info('Published message # %i', self._message_number)
        self.schedule_next_message()
queue.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send(self, body, **kw):
        return self.channel.basic_publish(self.exchange,
                                          self.routing_key,
                                          json.dumps(body),
                                          properties=pika.BasicProperties(
                                             delivery_mode = 2, # make message persistent
                                          ),
                                          **kw)
FeedQueue.py 文件源码 项目:fmn.sse 作者: fedora-infra 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def push_message(self, msg):
        self._check_connection()

        if self.channel.basic_publish(exchange=self.exchange,
                                      routing_key=self.exchange + '-' + self.queue_name,
                                      body=msg,
                                      properties=pika.BasicProperties(
                                          delivery_mode=2)):
            print('message sent')
        else:
            print('ERROR: message failed to send')
converter.py 文件源码 项目:SkySpyWatch 作者: nstarpost 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def enqueue_flight_snippet(flight_snippet):
    """Add items from the flight_dictionary to rabbitmq queue as json strings"""
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=flight_snippet,
                          properties=pika.BasicProperties(delivery_mode=2)  # make message persistent
                          )


# this function reads a dictionary of a flight snapshot and returns a different and easier to work with dictionary
job_queue.py 文件源码 项目:dazzar 作者: Philaeux 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def produce(self, message):
        """Publish a message to add inside the queue.

        Args;
            message: object to add inside the queue.
        """
        self.channel.basic_publish(exchange='',
                                   routing_key='dazzar_jobs',
                                   body=pickle.dumps(message),
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,  # make message persistent
                                   ))
utils.py 文件源码 项目:EventMiner 作者: hltcoe 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
utils.py 文件源码 项目:EventMiner 作者: hltcoe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
utils.py 文件源码 项目:EventMiner 作者: hltcoe 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
utils.py 文件源码 项目:EventMiner 作者: hltcoe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
utils.py 文件源码 项目:EventMiner 作者: hltcoe 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
utils.py 文件源码 项目:EventMiner 作者: hltcoe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(self, n, routing):
        self.channel.basic_publish(exchange='',
                                   routing_key=routing,
                                   properties=pika.BasicProperties(
                                            delivery_mode=2,),
                                   body=json.dumps(n))
rabbitmq_core.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start(self, cmd, routing_key="remote.call"):
        self.response = []  # ??????????
        self.correlation_id = str(uuid.uuid4())
        self.log.info("exec command {}".format(cmd))
        self.log.debug("routing key {}".format(routing_key))
        self.channel.basic_publish(exchange=self.exchange,
                                   routing_key=routing_key,  # ?routing key???????????????routing key???
                                   properties=pika.BasicProperties(
                                       reply_to=self.queue_name,
                                       correlation_id=self.correlation_id
                                   ),
                                   body=cmd)
        before = time.monotonic()  # ??????????
        after_len = 0  # ?????????????
        while True:
            if len(self.response) != after_len:  # ?????????,????????????
                before_len = len(self.response)
            else:
                before_len = after_len  # ????????????????????????????,??????,????????,?????????
                time.sleep(0.4)
            self.connection.process_data_events()  # ??????,?????????????
            if len(self.response) == before_len and before_len:
                break
            after = time.monotonic()  # ?????????????
            if (after - before) > self.timeout:  # ???????16s,?????????
                break
        return self.response  # ??????????
rpc_client.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


问题


面经


文章

微信
公众号

扫码关注公众号