def publish(self, item, priority=0, retry=2):
body = json.dumps(item)
try:
self._channel.basic_publish(exchange=u'',
routing_key=self._queue_name,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
priority=priority
))
except exceptions.ConnectionClosed as err:
if retry <= 0:
raise err
self.open()
self.publish(item, retry - 1)
python类BasicProperties()的实例源码
def properties(self) -> BasicProperties:
""" Build :class:`pika.BasicProperties` object """
return BasicProperties(
content_type=self.content_type,
content_encoding=self.content_encoding,
headers=self.headers,
delivery_mode=self.delivery_mode,
priority=self.priority,
correlation_id=self.correlation_id,
reply_to=self.reply_to,
expiration=str(convert_timestamp(self.expiration * 1000)) if self.expiration else None,
message_id=self.message_id,
timestamp=self.timestamp,
type=self.type,
user_id=self.user_id,
app_id=self.app_id
)
def send(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key = 'eagle',
properties = pika.BasicProperties(\
reply_to = self.callback_queue,
correlation_id =self.corr_id,),
body=message
)
for i in xrange(self.timeout):
if self.response is None:
self.connection.process_data_events()
else:
break
time.sleep(1)
return self.response
def process_rss(rss_result, message_body, redis_conn, message_queue):
for result in rss_result:
page_url = _convert_url(result.url, message_body['website'])
in_database = _check_redis(page_url, redis_conn)
message_body['title'] = result.title
message_body['date'] = result.date
message_body['url'] = page_url
to_send = json.dumps(message_body)
if not in_database:
message_queue.basic_publish(exchange='',
routing_key='scraper_queue',
body=to_send,
properties=pika.BasicProperties(
delivery_mode=2,))
#Set the value within redis to expire in 3 days
redis_conn.setex(page_url, 259200, 1)
else:
pass
def on_mutation_request(self, ch, method, props, body):
"""Callback for messages in the 'rpc_mutations_queue'
They say: "Hey, do you have a mutation for me?"
"""
# This is the "remote procedure"
# being called and returning a value
mutation_obj = self.get_mutation()
ch.basic_publish(exchange = '',
routing_key = props.reply_to,
properties = pika.BasicProperties(
correlation_id = props.correlation_id),
body = mutation_obj.serialize_me())
ch.basic_ack(delivery_tag = method.delivery_tag)
def on_evaluation_request(self, ch, method, props, body):
"""Callback for messages in the 'rpc_evaluations_queue'
They say: "Hey, here are the execution results"
"""
# This is the "remote procedure"
# being called and returning a value
ev_mutation_object = pickle.loads(body)
self.process_execution_results(ev_mutation_object)
ch.basic_publish(exchange = '',
routing_key = props.reply_to,
properties = pika.BasicProperties(
correlation_id = props.correlation_id),
body = 'EVALUATION RECEIVED')
ch.basic_ack(delivery_tag = method.delivery_tag)
def poll_mutation_queue(self):
"""
In this paradigm calling means pushing our message
to the queue (the callback will take care of it)
and wait for the response and process it.
@returns: string, serialized MutationObject (only attributes)
"""
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange = '', # default exchange
routing_key = 'rpc_mutations_queue',
properties = pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id),
body = 'POLL MUTATION QUEUE')
self.ae.m_info("[x] Sent mutation queue poll")
while self.response is None:
# Waiting for a response
self.connection.process_data_events()
return self.response
def send_evaluation(self, mutation_object):
"""
In this paradigm calling means pushing our message
to the queue (the callback will take care of it)
and wait for the response and process it.
@returns: string, serialized MutationObject (only attributes)
"""
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange = '', # default exchange
routing_key = 'rpc_evaluations_queue',
properties = pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id),
# This should be a serialized
# evaluation object
body = mutation_object.serialize_me())
self.ae.m_info("[x] Sent evaluation")
while self.response is None:
# Waiting for a response
self.connection.process_data_events()
return self.response
def poll_mutation_queue(self):
"""
In this paradigm calling means pushing our message
to the queue (the callback will take care of it)
and wait for the response and process it.
@returns: string, serialized MutationObject (only attributes)
"""
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange = '', # default exchange
routing_key = 'rpc_mutations_queue',
properties = pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id),
body = 'POLL MUTATION QUEUE')
self.ae.m_info("[x] Sent mutation queue poll")
while self.response is None:
# Waiting for a response
self.connection.process_data_events()
return self.response
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
basicProperties = pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
)
self.channel.basic_publish(exchange='',
routing_key=QUEUE,
properties=basicProperties,
body=str(n))
while self.response is None:
self.connection.process_data_events()
return self.response
def on_request(ch, method, props, body):
global REQ_COUNT
REQ_COUNT += 1
print(" [x] Listening ... Request Number: %i" % REQ_COUNT)
body = json.load(StringIO(body))
operator = body['operator']
values = body['data']
print(" [.] mathOps(%s)" % operator)
response = json.dumps(mathOps(values, operator=operator),
separators=(',', ':'))
print(" Output: %s\n" % response)
basicProperties = pika.BasicProperties(correlation_id=props.correlation_id)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=basicProperties,
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
def run(self, function, args=None, kwargs=None, retry_policy=None, callback=None):
with self._pool.acquire() as cxn:
cxn.channel.basic_publish(
body=self.serialize(
{
"function": function,
"parameters": {
"args": args or tuple(),
"kwargs": kwargs or {},
"retry_policy": retry_policy,
"callback": callback
}
}
),
exchange='',
routing_key=self.queue_name,
properties=pika.BasicProperties(
delivery_mode=2,
)
)
self.log(logging.DEBUG, "Task received : {}".format(function))
def request(self, n):
corr_id = str(uuid.uuid4())
self.response[corr_id] = None
#???????????????correlation_id
self.channel.basic_publish(exchange='',
routing_key='compute_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = corr_id,
),
body=str(n))
#???????
while self.response[corr_id] is None:
self.connection.process_data_events()
return int(self.response[corr_id])
def publish(self, message):
self._message_number_out += 1
amqp_message_update_meta(message, self.get_meta())
amqp_msg = amqp_message_encode(message)
log.debug("Publish message #%s, AMQP message: %s" % (self._message_number_out, amqp_msg))
properties = BasicProperties(
app_id=self.app_id,
content_type='application/json',
content_encoding='utf-8',
delivery_mode=2, # persistent
)
try:
yield self._channel.basic_publish(
self.exchange_name,
self.queue_out_routing_key,
amqp_msg,
properties=properties,
)
except ChannelClosed:
self.retry_channel()
self._cached_messages.append(message)
except AMQPError:
self.retry_connect()
self._cached_messages.append(message)
def callback(self, ch, method, properties, body):
"""
????,????????????rabbitmq???
:param ch: ???self.channel
:param method:
:param properties:???????????
:param body:????
:return:
"""
before = time.monotonic() # ?????????????
exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
exec_cmd.start()
exec_cmd.join(self.timeout)
after = time.monotonic() # ????????????,????????????
if (after - before) > self.timeout: # ????????????????,??????????,???????????
self.response = bytes("command exec timeout", "utf8")
print(" [*] Got a task {}".format(str(body, "utf8)")))
message = {"host": self.id, "data": self.response}
ch.basic_publish(exchange="",
routing_key=properties.reply_to,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id,),
body=bytes(str(message), "utf-8"))
ch.basic_ack(delivery_tag=method.delivery_tag)
def message_worker(self):
while 1:
try:
record, routing_key = self.queue.get()
if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed:
self.open_connection()
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=self.format(record),
properties=pika.BasicProperties(
delivery_mode=2
)
)
except Exception:
self.channel, self.connection = None, None
self.handleError(record)
finally:
self.queue.task_done()
if self.close_after_emit:
self.close_connection()
def whois_push(**whois_recv_info):
global channel_whois
# print 'whois push:', whois_recv_info
result = ''
try:
result = json.dumps(whois_recv_info)
except UnicodeDecodeError:
for key in whois_recv_info.keys():
if type(whois_recv_info[key]) == str:
whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8")
result = json.dumps(whois_recv_info)
if result != '':
channel_whois.basic_publish(
exchange='',
routing_key='whois_queue',
body=json.dumps(result),
properties=pika.BasicProperties(
delivery_mode=2)
)
# ????com_manage???????whois??????xxx?????????????
def whois_push(**whois_recv_info):
global channel_whois
# print 'whois push:', whois_recv_info
result = ''
try:
result = json.dumps(whois_recv_info)
except UnicodeDecodeError:
for key in whois_recv_info.keys():
if type(whois_recv_info[key]) == str:
whois_recv_info[key] = whois_recv_info[key].decode('latin-1').encode("utf-8")
result = json.dumps(whois_recv_info)
if result != '':
channel_whois.basic_publish(
exchange='',
routing_key='whois_queue',
body=json.dumps(result),
properties=pika.BasicProperties(
delivery_mode=2)
)
# ????com_manage???????whois??????xxx?????????????
def send_task(self):
while True:
if self.send_queue.empty()&self.handle_stoping:
self.send_stop_evt.set()
return
if not self.send_queue.empty():
callinfo=self.send_queue.get_nowait()
# ??RPC?????RPC????`rpc_queue`????????`reply_to`?`correlation_id`
self._channel.basic_publish(exchange=self.Exchange,
routing_key=self.Queue,
properties=pika.BasicProperties(
reply_to = self.callback_queue,
),
body=callinfo.body)
gevent.sleep(0)
def publish_submission_message(challenge_id, phase_id, submission_id):
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='evalai_submissions', type='topic')
# though worker is creating the queue(queue creation is idempotent too)
# but lets create the queue here again, so that messages dont get missed
# later on we can apply a check on queue message length to raise some alert
# this way we will be notified of worker being up or not
channel.queue_declare(queue='submission_task_queue', durable=True)
message = {
'challenge_id': challenge_id,
'phase_id': phase_id,
'submission_id': submission_id
}
channel.basic_publish(exchange='evalai_submissions',
routing_key='submission.*.*',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)) # make message persistent
print(" [x] Sent %r" % message)
connection.close()
def start_consuming(self):
"""Exchange, channel, consumer ready to start listening"""
# send rpc request
self.worker_id = None
self.correlation_id = uuid.uuid4().hex
self._channel.basic_publish(
exchange=self.exchange,
routing_key='%s.worker.%s' % (self.key, self.worker_type),
properties=pika.BasicProperties(
reply_to=self.queue,
correlation_id=self.correlation_id,
content_type='application/json',
),
body=json.dumps(self.worker_kwargs),
)
log.info("%s: sent RPC request, will wait for response.", self.lbl)
super(_HubTornadoConsumer, self).start_consuming()
def send_to_worker(self, action, msg=''):
if not self.consumer.worker_id:
raise Exception("Routing key not yet received in RPC response.")
routing_key = '%s.%s' % (self.consumer.worker_id, action)
if isinstance(msg, basestring):
self.consumer._channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=msg)
else:
self.consumer._channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
properties=pika.BasicProperties(
content_type='application/json',
),
body=json.dumps(msg),
)
def _publish(self, exchange_name, queue_name, body, priority, retry):
try:
self._channel.basic_publish(exchange=exchange_name,
routing_key=queue_name,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
priority=priority
))
except exceptions.ConnectionClosed as err:
if retry <= 0:
raise err
self.open()
self._publish(exchange_name, queue_name, body, priority, retry - 1)
def _send_signal(self, signal_name, args):
# Send a signal on the exchange
body = {'signal_name': signal_name, 'args': args}
body = serializer.dumps(body)
b = False
while not b:
try:
self.channel.basic_publish(
exchange='rebus_signals', routing_key='', body=body,
properties=pika.BasicProperties(delivery_mode=2,))
b = True
except pika.exceptions.ConnectionClosed:
log.info("Disconnected (in _send_signal). "
"Trying to reconnect...")
self._reconnect()
time.sleep(0.5)
# TODO Check is the key is valid
def publish_message(queue_name, message):
message_body = json.dumps(message)
message_id = message['message_id']
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
log_msg = "Published : [queue_name={}] [comment_id={}] [username={}] [comment_body={}]".format(
queue_name, message['comment_id'], message['username'], message['comment_body'])
logger.log_info_message(message_id, LogUtilityConstants.message_published_event,
'sub_monitor', log_msg)
connection.close()
def publish(queue_name,body='Hello World!',exchange=''):
"""Publish the content or message to the queue
pika.BasicProperties(delivery_mode=2) will make message persistent
Args:
queue_name: the mq`s name
body(str) —— the content will be publish
exchange:
"""
channel = get_channel(queue_name)
if channel:
channel.basic_publish(exchange=exchange,
routing_key=queue_name,
body=body,properties=pika.BasicProperties(delivery_mode=2))
# logger.info('ramq publish queue_name: ' + str(queue_name) + ' ,body: \n' + str(body) + '\n')
# print(" [x] Sent " + body)
my_rabbitmq.py 文件源码
项目:python_rabbitmq_multiprocessing_crawl
作者: ghotiv
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def put_queue_list(self, queue_name=None, message_list=None):
"""put queue to list"""
if not queue_name:
return None
try:
if not message_list:
return None
if isinstance(message_list, dict):
message_list = [message_list]
self.__connect()
self.channel.queue_declare(queue=queue_name, durable=True)
for message in message_list:
message = json.dumps(message)
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(delivery_mode=2, ))
self.connection.close()
except Exception as e:
print e
return None
def call(self, submit_id, result_path, data_path, judge_path):
rpc_body = encode(submit_id, result_path, data_path, judge_path)
for i in range(5):
try:
app.logger.info("try!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path))
self.channel.basic_publish(exchange='',
routing_key=self.ch,
properties=pika.BasicProperties(
delivery_mode=2,
),
body=rpc_body)
return
except pika.exceptions.ConnectionClosed:
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.ch, durable=True)
app.logger.info("local!! %s %s %s %s" % (submit_id, result_path, data_path, judge_path))
#convert to local judge. that's a sync way!
from .sandbox_server import SandBoxService
SandBoxService.local_exec(submit_id, result_path, data_path, judge_path)
def publish(self, payload):
with rmq_pool.acquire() as cxn:
try:
cxn.channel.queue_declare(queue=QUEUE_NAME, auto_delete=True)
cxn.channel.basic_publish(
body=json.dumps(payload),
exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(
content_type='plain/text'
)
)
subscriber_id = payload['subscriber']['_id']
logger.info(f"Queue.publish published: {subscriber_id}")
except Exception as e:
logger.error(f"Queue.publish exception: {e}")
def sendRegisterMessage(server,routingKeys):
exchangeName="qos.service"
queueName="heartbeatService"
msgHeaders={"__TypeId__":"com.tecomgroup.qos.communication.message.ServerStarted"}
msgBody={"originName":None,"serverName":""}
serverConfig = server.getConfigObject()
errors=[]
mqConf = getMqConf(serverConfig['mq'], server.name, errors)
# raise exception only if all mq's are down, so message sending is impossible
if mqConf is None:
raise Exception("sendRegisterMessage error: " + str(errors))
connection=pika.BlockingConnection(pika.URLParameters(mqConf['amqpUrl']))
channel = connection.channel()
channel.exchange_declare(exchange=exchangeName, exchange_type='topic', durable=True)
channel.queue_declare(queue=queueName, durable=True,arguments={'x-message-ttl':1800000})
channel.queue_bind(queue=queueName, exchange=exchangeName, routing_key="server.agent.register")
for key in routingKeys:
channel.basic_publish(
exchange=exchangeName,
routing_key=key,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
content_type='application/json',
content_encoding='UTF-8',
priority=0,
expiration="86400000",
headers=msgHeaders),
body=json.dumps(msgBody).encode('UTF-8')
)
connection.close()