def send_message_to_queue(message):
global corr_id
global response
global connection
global channel
global callback_queue
response=None
connection = pika.BlockingConnection(pika.ConnectionParameters(host="37.187.22.103",port=2765,heartbeat_interval=30))
channel = connection.channel()
result=channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_consume(on_response, no_ack=True,
queue=callback_queue)
corr_id=str(uuid.uuid4())
response = None
corr_id = str(uuid.uuid4())
channel.basic_publish( exchange='',
routing_key="rpc_queue",
properties=pika.BasicProperties(
reply_to = callback_queue,
correlation_id = corr_id),
body=message)
print(" [x] Sent data to RabbitMQ")
while response is None:
connection.process_data_events()
print(" [x] Get response from RabbitMQ")
return str(response)
python类BasicProperties()的实例源码
def send_message_to_queue(message):
global corr_id
global response
global connection
global channel
global callback_queue
response=None
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq,port=int(rabbitmq_port),heartbeat_interval=30))
channel = connection.channel()
result=channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_consume(on_response, no_ack=True,
queue=callback_queue)
corr_id=str(uuid.uuid4())
response = None
corr_id = str(uuid.uuid4())
channel.basic_publish( exchange='',
routing_key="rpc_queue",
properties=pika.BasicProperties(
reply_to = callback_queue,
correlation_id = corr_id),
body=message)
print(" [x] Sent data to RabbitMQ")
while response is None:
connection.process_data_events()
print(" [x] Get response from RabbitMQ")
print "response: "+str(response)
return response
def on_request(self, ch, method, props, body):
response = self.receive(body)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
def request(ch, method, props, body):
print(" [.] increase(%s)" % (body,))
response = increase(int(body))
#???????????????correlation_id???
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
def request(self, n):
self.response = None
#??????????????
self.channel.basic_publish(exchange='',
routing_key='compute_queue',
properties=pika.BasicProperties(
reply_to =self.callback_queue,
),
body=str(n))
#???????
while self.response is None:
self.connection.process_data_events()
return int(self.response)
def publish_message(self):
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Once the message has been sent, schedule another message to be sent.
The main reason I put scheduling in was just so you can get a good idea
of how the process is flowing by slowing down and speeding up the
delivery intervals by changing the PUBLISH_INTERVAL constant in the
class.
"""
if self._stopping:
return
# controllo che il servizio di acquisizione sia attivo ...
# if not self._winservice.isRunning():
# LOGGER.info('Win Service is not running...')
# print 'Win Service is not running...'
# return
message = self.get_message_from_selected_data();
#print "***************************************"
#print json.dumps(message, ensure_ascii=False)
#print "***************************************"
properties = pika.BasicProperties(app_id='myrabbit_py-publisher',
content_type='application/json')
self._channel.basic_publish(self.EXCHANGE,
self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties)
self._message_number += 1
self._deliveries.append(self._message_number)
logger.info('Published message # %i', self._message_number)
self.schedule_next_message()
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
),
**kw)
def push_message(self, msg):
self._check_connection()
if self.channel.basic_publish(exchange=self.exchange,
routing_key=self.exchange + '-' + self.queue_name,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2)):
print('message sent')
else:
print('ERROR: message failed to send')
def enqueue_flight_snippet(flight_snippet):
"""Add items from the flight_dictionary to rabbitmq queue as json strings"""
channel.basic_publish(exchange='',
routing_key=queue_name,
body=flight_snippet,
properties=pika.BasicProperties(delivery_mode=2) # make message persistent
)
# this function reads a dictionary of a flight snapshot and returns a different and easier to work with dictionary
def produce(self, message):
"""Publish a message to add inside the queue.
Args;
message: object to add inside the queue.
"""
self.channel.basic_publish(exchange='',
routing_key='dazzar_jobs',
body=pickle.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
def send(self, n, routing):
self.channel.basic_publish(exchange='',
routing_key=routing,
properties=pika.BasicProperties(
delivery_mode=2,),
body=json.dumps(n))
def send(self, n, routing):
self.channel.basic_publish(exchange='',
routing_key=routing,
properties=pika.BasicProperties(
delivery_mode=2,),
body=json.dumps(n))
def send(self, n, routing):
self.channel.basic_publish(exchange='',
routing_key=routing,
properties=pika.BasicProperties(
delivery_mode=2,),
body=json.dumps(n))
def send(self, n, routing):
self.channel.basic_publish(exchange='',
routing_key=routing,
properties=pika.BasicProperties(
delivery_mode=2,),
body=json.dumps(n))
def send(self, n, routing):
self.channel.basic_publish(exchange='',
routing_key=routing,
properties=pika.BasicProperties(
delivery_mode=2,),
body=json.dumps(n))
def send(self, n, routing):
self.channel.basic_publish(exchange='',
routing_key=routing,
properties=pika.BasicProperties(
delivery_mode=2,),
body=json.dumps(n))
def start(self, cmd, routing_key="remote.call"):
self.response = [] # ??????????
self.correlation_id = str(uuid.uuid4())
self.log.info("exec command {}".format(cmd))
self.log.debug("routing key {}".format(routing_key))
self.channel.basic_publish(exchange=self.exchange,
routing_key=routing_key, # ?routing key???????????????routing key???
properties=pika.BasicProperties(
reply_to=self.queue_name,
correlation_id=self.correlation_id
),
body=cmd)
before = time.monotonic() # ??????????
after_len = 0 # ?????????????
while True:
if len(self.response) != after_len: # ?????????,????????????
before_len = len(self.response)
else:
before_len = after_len # ????????????????????????????,??????,????????,?????????
time.sleep(0.4)
self.connection.process_data_events() # ??????,?????????????
if len(self.response) == before_len and before_len:
break
after = time.monotonic() # ?????????????
if (after - before) > self.timeout: # ???????16s,?????????
break
return self.response # ??????????
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange="",
routing_key="rpc_queue",
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)