def consume():
"""Creates mongo, redis, and rabbitmq connections; consumes queue."""
logger.debug("Consume started")
redis_host = 'localhost'
redis_port = 6379
# connect to mongodb
client = MongoClient()
dbmongo = client.rt_flights_test
# connect to redis
r = redis.StrictRedis(host=redis_host, port=redis_port, db=0, decode_responses=True)
# connect to rabbitmq and create queue
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
task_queue = channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
# start pulling data off the queue
channel.basic_consume(lambda ch, method, properties, body: callback(ch, method, properties, body, r, dbmongo), queue=queue_name)
channel.start_consuming()
client.close()
return 0
评论列表
文章目录