def connections(self, wait):
"""
wait for connections to both rabbitmq and elasticsearch to be made
before binding a routing key to a channel and sending messages to
elasticsearch
"""
while wait:
try:
params = pika.ConnectionParameters(host=self.rmq_host,
port=self.rmq_port)
connection = pika.BlockingConnection(params)
self.channel = connection.channel()
self.channel.exchange_declare(exchange='topic_recs',
exchange_type='topic')
result = self.channel.queue_declare(exclusive=True)
self.queue_name = result.method.queue
self.es_conn = Elasticsearch([{'host': self.es_host,
'port': self.es_port}])
wait = False
print("connected to rabbitmq...")
except Exception as e: # pragma: no cover
print(str(e))
print("waiting for connection to rabbitmq..." + str(e))
time.sleep(2)
wait = True
评论列表
文章目录