def __init__(self, queue_name, serializer, rabbitmq_configs, *args, **kwargs):
self.queue_name = queue_name
self.serialize = serializer
super(RabbitMQRunner, self).__init__(*args, **kwargs)
self.log(logging.DEBUG, "RabbitMQ Runner is ready...")
def _create_pool():
connection_pool_configs = rabbitmq_configs.get('connection_pool_configs', {})
def create_connection():
self.log(logging.DEBUG, "Creating new rabbitmq connection")
con_params = pika.ConnectionParameters(**rabbitmq_configs.get('connection_parameters', {}))
return pika.BlockingConnection(con_params)
return pika_pool.QueuedPool(
create=create_connection,
**connection_pool_configs
)
self._pool = SimpleLazyObject(_create_pool)
评论列表
文章目录