python类BasicProperties()的实例源码

queue.py 文件源码 项目:frontoxy 作者: fabienvauchelles 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def publish(self, item, priority=0, retry=2):
        body = json.dumps(item)

        try:
            self._channel.basic_publish(exchange=u'',
                                        routing_key=self._queue_name,
                                        body=body,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,
                                            priority=priority
                                        ))

        except exceptions.ConnectionClosed as err:
            if retry <= 0:
                raise err

            self.open()
            self.publish(item, retry - 1)
message.py 文件源码 项目:aio-pika 作者: mosquito 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def properties(self) -> BasicProperties:
        """ Build :class:`pika.BasicProperties` object """
        return BasicProperties(
            content_type=self.content_type,
            content_encoding=self.content_encoding,
            headers=self.headers,
            delivery_mode=self.delivery_mode,
            priority=self.priority,
            correlation_id=self.correlation_id,
            reply_to=self.reply_to,
            expiration=str(convert_timestamp(self.expiration * 1000)) if self.expiration else None,
            message_id=self.message_id,
            timestamp=self.timestamp,
            type=self.type,
            user_id=self.user_id,
            app_id=self.app_id
        )
mq.py 文件源码 项目:eagle 作者: saga92 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def send(self, message):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                routing_key = 'eagle',
                properties = pika.BasicProperties(\
                    reply_to = self.callback_queue,
                    correlation_id =self.corr_id,),
                body=message
            )
        for i in xrange(self.timeout):
            if self.response is None:
                self.connection.process_data_events()
            else:
                break
            time.sleep(1)
        return self.response
rss.py 文件源码 项目:atlas 作者: johnb30 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_rss(rss_result, message_body, redis_conn, message_queue):
    for result in rss_result:
        page_url = _convert_url(result.url, message_body['website'])

        in_database = _check_redis(page_url, redis_conn)

        message_body['title'] = result.title
        message_body['date'] = result.date
        message_body['url'] = page_url

        to_send = json.dumps(message_body)

        if not in_database:
            message_queue.basic_publish(exchange='',
                                        routing_key='scraper_queue',
                                        body=to_send,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,))
            #Set the value within redis to expire in 3 days
            redis_conn.setex(page_url, 259200, 1)
        else:
            pass
rpc_server.py 文件源码 项目:BrundleFuzz 作者: carlosgprado 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_mutation_request(self, ch, method, props, body):
        """Callback for messages in the 'rpc_mutations_queue'

        They say: "Hey, do you have a mutation for me?"
        """

        # This is the "remote procedure"
        # being called and returning a value
        mutation_obj = self.get_mutation()

        ch.basic_publish(exchange = '',
                         routing_key = props.reply_to,
                         properties = pika.BasicProperties(
                                    correlation_id = props.correlation_id),
                         body = mutation_obj.serialize_me())

        ch.basic_ack(delivery_tag = method.delivery_tag)
rpc_server.py 文件源码 项目:BrundleFuzz 作者: carlosgprado 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_evaluation_request(self, ch, method, props, body):
        """Callback for messages in the 'rpc_evaluations_queue'

        They say: "Hey, here are the execution results"
        """

        # This is the "remote procedure"
        # being called and returning a value
        ev_mutation_object = pickle.loads(body)
        self.process_execution_results(ev_mutation_object)

        ch.basic_publish(exchange = '',
                         routing_key = props.reply_to,
                         properties = pika.BasicProperties(
                                    correlation_id = props.correlation_id),
                         body = 'EVALUATION RECEIVED')

        ch.basic_ack(delivery_tag = method.delivery_tag)
rpc_client.py 文件源码 项目:BrundleFuzz 作者: carlosgprado 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def poll_mutation_queue(self):
        """
        In this paradigm calling means pushing our message
        to the queue (the callback will take care of it)
        and wait for the response and process it.
        @returns: string, serialized MutationObject (only attributes)
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',   # default exchange
                                   routing_key = 'rpc_mutations_queue',
                                   properties = pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id),
                                   body = 'POLL MUTATION QUEUE')

        self.ae.m_info("[x] Sent mutation queue poll")

        while self.response is None:
            # Waiting for a response
            self.connection.process_data_events()

        return self.response
rpc_client.py 文件源码 项目:BrundleFuzz 作者: carlosgprado 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_evaluation(self, mutation_object):
        """
        In this paradigm calling means pushing our message
        to the queue (the callback will take care of it)
        and wait for the response and process it.
        @returns: string, serialized MutationObject (only attributes)
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',   # default exchange
                                   routing_key = 'rpc_evaluations_queue',
                                   properties = pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id),
                                   # This should be a serialized
                                   # evaluation object
                                   body = mutation_object.serialize_me())

        self.ae.m_info("[x] Sent evaluation")

        while self.response is None:
            # Waiting for a response
            self.connection.process_data_events()

        return self.response
rpc_client.py 文件源码 项目:BrundleFuzz 作者: carlosgprado 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def poll_mutation_queue(self):
        """
        In this paradigm calling means pushing our message
        to the queue (the callback will take care of it)
        and wait for the response and process it.
        @returns: string, serialized MutationObject (only attributes)
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange = '',   # default exchange
                                   routing_key = 'rpc_mutations_queue',
                                   properties = pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id),
                                   body = 'POLL MUTATION QUEUE')

        self.ae.m_info("[x] Sent mutation queue poll")

        while self.response is None:
            # Waiting for a response
            self.connection.process_data_events()

        return self.response
client.py 文件源码 项目:salt-terraform-demo 作者: dguitarbite 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        basicProperties = pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=self.corr_id,
        )

        self.channel.basic_publish(exchange='',
                                   routing_key=QUEUE,
                                   properties=basicProperties,
                                   body=str(n))

        while self.response is None:
            self.connection.process_data_events()
        return self.response
server.py 文件源码 项目:salt-terraform-demo 作者: dguitarbite 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def on_request(ch, method, props, body):

    global REQ_COUNT

    REQ_COUNT += 1
    print(" [x] Listening ... Request Number: %i" % REQ_COUNT)
    body = json.load(StringIO(body))
    operator = body['operator']
    values = body['data']

    print(" [.] mathOps(%s)" % operator)
    response = json.dumps(mathOps(values, operator=operator), 
                                  separators=(',', ':'))
    print(" Output: %s\n" % response)
    basicProperties = pika.BasicProperties(correlation_id=props.correlation_id)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=basicProperties,
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
rabbitmq.py 文件源码 项目:easy-job 作者: inb-co 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self, function, args=None, kwargs=None, retry_policy=None, callback=None):
        with self._pool.acquire() as cxn:
            cxn.channel.basic_publish(
                body=self.serialize(
                    {
                        "function": function,
                        "parameters": {
                            "args": args or tuple(),
                            "kwargs": kwargs or {},
                            "retry_policy": retry_policy,
                            "callback": callback
                        }
                    }
                ),
                exchange='',
                routing_key=self.queue_name,
                properties=pika.BasicProperties(
                    delivery_mode=2,
                )
            )
            self.log(logging.DEBUG, "Task received : {}".format(function))
correlationid_center.py 文件源码 项目:Python_Study 作者: thsheep 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def request(self, n):
        corr_id = str(uuid.uuid4())
        self.response[corr_id] = None

        #???????????????correlation_id
        self.channel.basic_publish(exchange='',
                             routing_key='compute_queue',
                             properties=pika.BasicProperties(
                               reply_to = self.callback_queue,
                               correlation_id = corr_id,
                                         ),
                 body=str(n))
        #???????
        while self.response[corr_id] is None:
            self.connection.process_data_events()
        return int(self.response[corr_id])
amqp.py 文件源码 项目:flowder 作者: amir-khakshour 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def publish(self, message):
        self._message_number_out += 1

        amqp_message_update_meta(message, self.get_meta())
        amqp_msg = amqp_message_encode(message)
        log.debug("Publish message #%s, AMQP message: %s" % (self._message_number_out, amqp_msg))
        properties = BasicProperties(
            app_id=self.app_id,
            content_type='application/json',
            content_encoding='utf-8',
            delivery_mode=2,  # persistent
        )
        try:
            yield self._channel.basic_publish(
                self.exchange_name,
                self.queue_out_routing_key,
                amqp_msg,
                properties=properties,
            )
        except ChannelClosed:
            self.retry_channel()
            self._cached_messages.append(message)
        except AMQPError:
            self.retry_connect()
            self._cached_messages.append(message)
rabbitmq_client.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def callback(self, ch, method, properties, body):
        """
        ????,????????????rabbitmq???
        :param ch:  ???self.channel
        :param method:
        :param properties:???????????
        :param body:????
        :return:
        """
        before = time.monotonic()  # ?????????????
        exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
        exec_cmd.start()
        exec_cmd.join(self.timeout)
        after = time.monotonic()  # ????????????,????????????
        if (after - before) > self.timeout:  # ????????????????,??????????,???????????
            self.response = bytes("command exec timeout", "utf8")
        print(" [*] Got a task {}".format(str(body, "utf8)")))
        message = {"host": self.id, "data": self.response}
        ch.basic_publish(exchange="",
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=properties.correlation_id,),
                         body=bytes(str(message), "utf-8"))
        ch.basic_ack(delivery_tag=method.delivery_tag)
handlers_oneway.py 文件源码 项目:python-logging-rabbitmq 作者: albertomr86 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def message_worker(self):
        while 1:
            try:
                record, routing_key = self.queue.get()

                if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed:
                    self.open_connection()

                self.channel.basic_publish(
                    exchange=self.exchange,
                    routing_key=routing_key,
                    body=self.format(record),
                    properties=pika.BasicProperties(
                        delivery_mode=2
                    )
                )
            except Exception:
                self.channel, self.connection = None, None
                self.handleError(record)
            finally:
                self.queue.task_done()
                if self.close_after_emit:
                    self.close_connection()
main-NSConflict-`13.py 文件源码 项目:Malicious_Domain_Whois 作者: h-j-13 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def whois_push(**whois_recv_info):
    global channel_whois
    # print 'whois push:', whois_recv_info
    result = ''
    try:
        result = json.dumps(whois_recv_info)
    except UnicodeDecodeError:
        for key in whois_recv_info.keys():
            if type(whois_recv_info[key]) == str:
                whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8")
        result = json.dumps(whois_recv_info)
    if result != '':
        channel_whois.basic_publish(
            exchange='',
            routing_key='whois_queue',
            body=json.dumps(result),
            properties=pika.BasicProperties(
                delivery_mode=2)
        )


# ????com_manage???????whois??????xxx?????????????
main.py 文件源码 项目:Malicious_Domain_Whois 作者: h-j-13 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def whois_push(**whois_recv_info):
    global channel_whois
    # print 'whois push:', whois_recv_info
    result = ''
    try:
        result = json.dumps(whois_recv_info)
    except UnicodeDecodeError:
        for key in whois_recv_info.keys():
            if type(whois_recv_info[key]) == str:
                whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8")
        result = json.dumps(whois_recv_info)
    if result != '':
        channel_whois.basic_publish(
            exchange='',
            routing_key='whois_queue',
            body=json.dumps(result),
            properties=pika.BasicProperties(
                delivery_mode=2)
        )


# ????com_manage???????whois??????xxx?????????????
amqp_client.py 文件源码 项目:pymqant 作者: liangdas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def send_task(self):
        while   True:
            if self.send_queue.empty()&self.handle_stoping:
                self.send_stop_evt.set()
                return
            if not self.send_queue.empty():
                callinfo=self.send_queue.get_nowait()
                # ??RPC?????RPC????`rpc_queue`????????`reply_to`?`correlation_id`
                self._channel.basic_publish(exchange=self.Exchange,
                                            routing_key=self.Queue,
                                            properties=pika.BasicProperties(
                                                    reply_to = self.callback_queue,
                                            ),
                                            body=callinfo.body)

            gevent.sleep(0)
sender.py 文件源码 项目:EvalAI 作者: Cloud-CV 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def publish_submission_message(challenge_id, phase_id, submission_id):

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='evalai_submissions', type='topic')

    # though worker is creating the queue(queue creation is idempotent too)
    # but lets create the queue here again, so that messages dont get missed
    # later on we can apply a check on queue message length to raise some alert
    # this way we will be notified of worker being up or not
    channel.queue_declare(queue='submission_task_queue', durable=True)

    message = {
        'challenge_id': challenge_id,
        'phase_id': phase_id,
        'submission_id': submission_id
    }
    channel.basic_publish(exchange='evalai_submissions',
                          routing_key='submission.*.*',
                          body=json.dumps(message),
                          properties=pika.BasicProperties(delivery_mode=2))    # make message persistent

    print(" [x] Sent %r" % message)
    connection.close()
tornado_client.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def start_consuming(self):
        """Exchange, channel, consumer ready to start listening"""

        # send rpc request
        self.worker_id = None
        self.correlation_id = uuid.uuid4().hex
        self._channel.basic_publish(
            exchange=self.exchange,
            routing_key='%s.worker.%s' % (self.key, self.worker_type),
            properties=pika.BasicProperties(
                reply_to=self.queue,
                correlation_id=self.correlation_id,
                content_type='application/json',
            ),
            body=json.dumps(self.worker_kwargs),
        )
        log.info("%s: sent RPC request, will wait for response.", self.lbl)

        super(_HubTornadoConsumer, self).start_consuming()
tornado_client.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def send_to_worker(self, action, msg=''):
        if not self.consumer.worker_id:
            raise Exception("Routing key not yet received in RPC response.")
        routing_key = '%s.%s' % (self.consumer.worker_id, action)
        if isinstance(msg, basestring):
            self.consumer._channel.basic_publish(exchange=self.exchange,
                                                 routing_key=routing_key,
                                                 body=msg)
        else:
            self.consumer._channel.basic_publish(
                exchange=self.exchange,
                routing_key=routing_key,
                properties=pika.BasicProperties(
                    content_type='application/json',
                ),
                body=json.dumps(msg),
            )
queue.py 文件源码 项目:frontoxy 作者: fabienvauchelles 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _publish(self, exchange_name, queue_name, body, priority, retry):
        try:
            self._channel.basic_publish(exchange=exchange_name,
                                        routing_key=queue_name,
                                        body=body,
                                        properties=pika.BasicProperties(
                                            delivery_mode=2,
                                            priority=priority
                                        ))

        except exceptions.ConnectionClosed as err:
            if retry <= 0:
                raise err

            self.open()
            self._publish(exchange_name, queue_name, body, priority, retry - 1)
master.py 文件源码 项目:rebus 作者: airbus-seclab 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _send_signal(self, signal_name, args):
        # Send a signal on the exchange
        body = {'signal_name': signal_name, 'args': args}
        body = serializer.dumps(body)
        b = False
        while not b:
            try:
                self.channel.basic_publish(
                    exchange='rebus_signals', routing_key='', body=body,
                    properties=pika.BasicProperties(delivery_mode=2,))
                b = True
            except pika.exceptions.ConnectionClosed:
                log.info("Disconnected (in _send_signal). "
                         "Trying to reconnect...")
                self._reconnect()
                time.sleep(0.5)

    # TODO Check is the key is valid
sub_monitor.py 文件源码 项目:SolutionGambling 作者: eganwall 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def publish_message(queue_name, message):
    message_body = json.dumps(message)
    message_id = message['message_id']

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message_body,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
    )
    log_msg = "Published : [queue_name={}] [comment_id={}] [username={}] [comment_body={}]".format(
        queue_name, message['comment_id'], message['username'], message['comment_body'])
    logger.log_info_message(message_id, LogUtilityConstants.message_published_event,
                            'sub_monitor', log_msg)

    connection.close()
utils.py 文件源码 项目:Simplechaindb 作者: BUAANLSDE 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def publish(queue_name,body='Hello World!',exchange=''):
    """Publish the content or message to the queue

        pika.BasicProperties(delivery_mode=2) will make message persistent

    Args:
        queue_name: the mq`s name
        body(str) —— the content will be publish
        exchange:

    """

    channel = get_channel(queue_name)
    if channel:
        channel.basic_publish(exchange=exchange,
                                     routing_key=queue_name,
                                     body=body,properties=pika.BasicProperties(delivery_mode=2))
    # logger.info('ramq publish queue_name: ' + str(queue_name) + ' ,body: \n' + str(body) + '\n')
    # print(" [x] Sent " + body)
my_rabbitmq.py 文件源码 项目:python_rabbitmq_multiprocessing_crawl 作者: ghotiv 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def put_queue_list(self, queue_name=None, message_list=None):
        """put queue to list"""
        if not queue_name:
            return None
        try:
            if not message_list:
                return None
            if isinstance(message_list, dict):
                message_list = [message_list]
            self.__connect()
            self.channel.queue_declare(queue=queue_name, durable=True)
            for message in message_list:
                message = json.dumps(message)
                self.channel.basic_publish(
                    exchange='',
                    routing_key=queue_name,
                    body=message,
                    properties=pika.BasicProperties(delivery_mode=2, ))
            self.connection.close()
        except Exception as e:
            print e
            return None
sandbox_client.py 文件源码 项目:xiaoxiang-oj 作者: hanfei19910905 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def call(self, submit_id, result_path, data_path, judge_path):
        rpc_body = encode(submit_id, result_path, data_path, judge_path)
        for i in range(5):
            try:
                app.logger.info("try!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path))
                self.channel.basic_publish(exchange='',
                                           routing_key=self.ch,
                                           properties=pika.BasicProperties(
                                                 delivery_mode=2,
                                                 ),
                                           body=rpc_body)
                return
            except pika.exceptions.ConnectionClosed:
                self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))

                self.channel = self.connection.channel()

                self.channel.queue_declare(queue=self.ch, durable=True)

        app.logger.info("local!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path))
        #convert to local judge. that's a sync way!
        from .sandbox_server import SandBoxService
        SandBoxService.local_exec(submit_id, result_path, data_path, judge_path)
queue.py 文件源码 项目:ssp-campaigns 作者: bloogrox 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def publish(self, payload):
        with rmq_pool.acquire() as cxn:
            try:
                cxn.channel.queue_declare(queue=QUEUE_NAME, auto_delete=True)
                cxn.channel.basic_publish(
                    body=json.dumps(payload),
                    exchange='',
                    routing_key=QUEUE_NAME,
                    properties=pika.BasicProperties(
                        content_type='plain/text'
                    )
                )
                subscriber_id = payload['subscriber']['_id']
                logger.info(f"Queue.publish published: {subscriber_id}")
            except Exception as e:
                logger.error(f"Queue.publish exception: {e}")
threadPollSubs.py 文件源码 项目:djangoStatusPanel 作者: okar1 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def sendRegisterMessage(server,routingKeys):

    exchangeName="qos.service"
    queueName="heartbeatService"
    msgHeaders={"__TypeId__":"com.tecomgroup.qos.communication.message.ServerStarted"}
    msgBody={"originName":None,"serverName":""}

    serverConfig = server.getConfigObject()
    errors=[]
    mqConf = getMqConf(serverConfig['mq'], server.name, errors)

    # raise exception only if all mq's are down, so message sending is impossible
    if mqConf is None:
        raise Exception("sendRegisterMessage error: " + str(errors))

    connection=pika.BlockingConnection(pika.URLParameters(mqConf['amqpUrl']))
    channel = connection.channel()

    channel.exchange_declare(exchange=exchangeName, exchange_type='topic', durable=True)
    channel.queue_declare(queue=queueName, durable=True,arguments={'x-message-ttl':1800000})
    channel.queue_bind(queue=queueName, exchange=exchangeName, routing_key="server.agent.register")

    for key in routingKeys:
        channel.basic_publish(
            exchange=exchangeName,
            routing_key=key,
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
                content_type='application/json',
                content_encoding='UTF-8',
                priority=0,
                expiration="86400000",
                headers=msgHeaders),
            body=json.dumps(msgBody).encode('UTF-8')
        )
    connection.close()


问题


面经


文章

微信
公众号

扫码关注公众号