def callback(self, ch, method, properties, body):
"""
????,????????????rabbitmq???
:param ch: ???self.channel
:param method:
:param properties:???????????
:param body:????
:return:
"""
before = time.monotonic() # ?????????????
exec_cmd = threading.Thread(target=self.exec_call, args=(body,))
exec_cmd.start()
exec_cmd.join(self.timeout)
after = time.monotonic() # ????????????,????????????
if (after - before) > self.timeout: # ????????????????,??????????,???????????
self.response = bytes("command exec timeout", "utf8")
print(" [*] Got a task {}".format(str(body, "utf8)")))
message = {"host": self.id, "data": self.response}
ch.basic_publish(exchange="",
routing_key=properties.reply_to,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id,),
body=bytes(str(message), "utf-8"))
ch.basic_ack(delivery_tag=method.delivery_tag)
评论列表
文章目录