def __init__(self, host="127.0.0.1", port=5672, username="", password="", **connection_options):
"""
Event transport via RabbitMQ server.
:param host: ipv4 or hostname
:param port: the port where the server listens
:param username: username used for authentication
:param password: password used for authentication
:param connection_options: extra arguments that will be used in
:py:class:`pika.BlockingConnection` initialization.
"""
if not pika:
raise RuntimeError("RabbitMqEventTransport requires 'pika' to run")
super(RabbitMqEventTransport, self).__init__()
self._handlers = {}
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=host, port=port,
credentials=pika.PlainCredentials(username=username, password=password),
**connection_options
)
)
self.channel = self.connection.channel()
python类BlockingConnection()的实例源码
def get_connection_amqp():
try:
port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError:
port = PORT
userid = config.get('ckan.harvest.mq.user_id', USERID)
password = config.get('ckan.harvest.mq.password', PASSWORD)
hostname = config.get('ckan.harvest.mq.hostname', HOSTNAME)
virtual_host = config.get('ckan.harvest.mq.virtual_host', VIRTUAL_HOST)
credentials = pika.PlainCredentials(userid, password)
parameters = pika.ConnectionParameters(host=hostname,
port=port,
virtual_host=virtual_host,
credentials=credentials,
frame_max=10000)
log.debug("pika connection using %s" % parameters.__dict__)
return pika.BlockingConnection(parameters)
def receive(self,callback, username, pwd, ip, port):
'''
# ????????????????????????
:param callback: ????
:param username: ??RabbitMQ???????
:param pwd: ??
:param ip: ip
:param port: ??
:return:
'''
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters(ip, port, '/', credentials=user_pwd))
channel = s_conn.channel()
channel.queue_declare(queue='city_task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
print '???????????'
channel.basic_consume(callback,
queue='city_task_queue',
)
channel.start_consuming()
def _check_analysis_queue(queue_name, thread_id=0):
"""
Private static method whose create the queue_name queue as singleton
"""
# check if connection exists for the thread
if thread_id not in Queue.connections:
try:
Queue.connections[thread_id] = pika.BlockingConnection(
pika.ConnectionParameters(Queue.host))
except pika.exceptions.ConnectionClosed as e:
logging.error("Error with RMQ server, check it's started.")
os._exit(1)
Queue.consumers[thread_id] = True
# check if channel exists for the thread
if queue_name not in Queue.channels\
or Queue.channels[queue_name].is_closed:
Queue.channels[queue_name] = Queue.connections[thread_id].channel()
Queue.channels[queue_name].queue_declare(queue=queue_name)
def getMqConf(mqConfig,serverName,vServerErrors):
pollName = "CheckRabbitMq"
errors = set()
# mqAmqpConnection if first working mq was found and res is corresponding mqConf
res=None
for mqConf in mqConfig:
try:
con = pika.BlockingConnection(pika.URLParameters(mqConf['amqpUrl']))
except Exception as e:
errors.add(str(e))
else:
if res is None:
# use first working connection in later tasks
res=mqConf
#close amqplink
if con.is_open:
con.close()
# endfor
vServerErrors.update(formatErrors(errors, serverName, pollName))
return res
def test_create_success(self):
req = dict(
container_name=test_cfg.CONTAINER_NAME,
image_id=test_cfg.CONTAINER_IMAGE_ID,
user_name=test_cfg.USER_NAME
)
with mock.patch('pika.BlockingConnection') as mock_block,\
mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
mock.patch.object(UiQueue, 'send') as mock_queue_send:
worker_res = dict(
code='0x1',
message='pass',
ins={}
)
mock_block.return_value = mock.Mock()
mock_block_channel.return_value = mock.Mock()
mock_queue_send.return_value = json.dumps(worker_res)
response = self.app.post('/create_ins', data = json.dumps(req), follow_redirects=True)
mock_queue_send.assert_called_once()
res_dict = json.loads(response.data)
self.assertEqual(res_dict.get('code'), '0x1')
# Container name occupied by others
def test_stop_success(self):
create_instance(self.ins)
req = dict(
container_serial=self.ins.get('container_serial'),
user_name=test_cfg.USER_NAME
)
with mock.patch('pika.BlockingConnection') as mock_block,\
mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
mock.patch.object(UiQueue, 'send') as mock_queue_send:
worker_res = dict(
code='0x1',
message='pass',
container_serial=self.ins.get('container_serial')
)
mock_block.return_value = mock.Mock()
mock_block_channel.return_value = mock.Mock()
mock_queue_send.return_value = json.dumps(worker_res)
response = self.app.post('/stop_ins', data=json.dumps(req), follow_redirects=True)
remove_instance_by_serial(self.ins.get('container_serial'))
mock_queue_send.assert_called_once()
res_dict = json.loads(response.data)
self.assertEqual(res_dict.get('code'), '0x1')
# Container is not exist
def test_restart_success(self):
create_instance(self.ins)
req = dict(
container_serial=self.ins.get('container_serial'),
user_name=test_cfg.USER_NAME
)
with mock.patch('pika.BlockingConnection') as mock_block,\
mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
mock.patch.object(UiQueue, 'send') as mock_queue_send:
worker_res = dict(
code='0x1',
message='pass',
container_serial=self.ins.get('container_serial')
)
mock_block.return_value = mock.Mock()
mock_block_channel.return_value = mock.Mock()
mock_queue_send.return_value = json.dumps(worker_res)
response = self.app.post('/restart_ins', data=json.dumps(req), follow_redirects=True)
remove_instance_by_serial(self.ins.get('container_serial'))
mock_queue_send.assert_called_once()
res_dict = json.loads(response.data)
self.assertEqual(res_dict.get('code'), '0x1')
# Container is not exist
def test_restart_failed(self):
req = dict(
container_serial=self.ins.get('container_serial'),
user_name=test_cfg.USER_NAME
)
with mock.patch('pika.BlockingConnection') as mock_block,\
mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
mock.patch.object(UiQueue, 'send') as mock_queue_send:
worker_res = dict(
code='0x1',
message='pass',
container_serial=self.ins.get('container_serial')
)
mock_block.return_value = mock.Mock()
mock_block_channel.return_value = mock.Mock()
mock_queue_send.return_value = json.dumps(worker_res)
response = self.app.post('/restart_ins', data=json.dumps(req), follow_redirects=True)
res_dict = json.loads(response.data)
self.assertEqual(res_dict.get('code'), '0x9')
def test_remove_success(self):
create_instance(self.ins)
req = dict(
container_serial=self.ins.get('container_serial'),
user_name=test_cfg.USER_NAME
)
with mock.patch('pika.BlockingConnection') as mock_block,\
mock.patch.object(pika.BlockingConnection,'channel') as mock_block_channel,\
mock.patch.object(UiQueue, 'send') as mock_queue_send:
worker_res = dict(
code='0x1',
message='pass',
container_serial=self.ins.get('container_serial')
)
mock_block.return_value = mock.Mock()
mock_block_channel.return_value = mock.Mock()
mock_queue_send.return_value = json.dumps(worker_res)
response = self.app.post('/remove_ins', data=json.dumps(req), follow_redirects=True)
remove_instance_by_serial(self.ins.get('container_serial'))
mock_queue_send.assert_called_once()
res_dict = json.loads(response.data)
self.assertEqual(res_dict.get('code'), '0x1')
def teamspeak(self, msg):
# Connect to rabbitmq
parameters = pika.URLParameters('amqp://'+self.RMQUSER+':'+self.RMQPASS+'@'+self.RMQHOST+':'+self.RMQPORT+'/'+self.RMQVHOST+'?socket_timeout='+self.RMQSOCKETTIMEOUT)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='teamspeak')
if isinstance(msg, list):
for m in msg:
logger.info("Sending msg to teamspeak: "+str(m))
channel.basic_publish(exchange='',routing_key='teamspeak',body=json.dumps(m))
else:
logger.info("Sending msg to teamspeak: "+str(msg))
channel.basic_publish(exchange='',routing_key='teamspeak',body=json.dumps(msg))
connection.close()
def __init__(self, queue_name, serializer, rabbitmq_configs, *args, **kwargs):
self.queue_name = queue_name
self.serialize = serializer
super(RabbitMQRunner, self).__init__(*args, **kwargs)
self.log(logging.DEBUG, "RabbitMQ Runner is ready...")
def _create_pool():
connection_pool_configs = rabbitmq_configs.get('connection_pool_configs', {})
def create_connection():
self.log(logging.DEBUG, "Creating new rabbitmq connection")
con_params = pika.ConnectionParameters(**rabbitmq_configs.get('connection_parameters', {}))
return pika.BlockingConnection(con_params)
return pika_pool.QueuedPool(
create=create_connection,
**connection_pool_configs
)
self._pool = SimpleLazyObject(_create_pool)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
#???????????
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response,
no_ack=True,
queue=self.callback_queue)
#??????????????
self.response = {}
#??????????????
def publish():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=args.rabbit_exchange,
durable=str2bool(args.exchange_durable),
auto_delete=str2bool(args.exchange_auto_delete),
type="topic")
channel.queue_declare(queue=args.rabbit_queue,
durable=str2bool(args.queue_durable),
auto_delete=str2bool(args.queue_auto_delete))
channel.queue_bind(args.rabbit_queue, args.rabbit_exchange,
args.routing_key)
message = 'Gremlin Coming!'
count = 0
while count < args.msg_per_thread:
channel.basic_publish(exchange=args.rabbit_exchange,
routing_key=args.routing_key,
body=message)
count = count + 1
connection.close()
def mq_worker():
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=mq_queue, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=mq_queue)
channel.start_consuming()
except Exception as e:
connection.close()
mq_worker()
def consume():
"""Creates mongo, redis, and rabbitmq connections; consumes queue."""
logger.debug("Consume started")
redis_host = 'localhost'
redis_port = 6379
# connect to mongodb
client = MongoClient()
dbmongo = client.rt_flights_test
# connect to redis
r = redis.StrictRedis(host=redis_host, port=redis_port, db=0, decode_responses=True)
# connect to rabbitmq and create queue
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
task_queue = channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
# start pulling data off the queue
channel.basic_consume(lambda ch, method, properties, body: callback(ch, method, properties, body, r, dbmongo), queue=queue_name)
channel.start_consuming()
client.close()
return 0
def dispatch_thread(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(
Static.RABBITMQ_HOST))
channel = connection.channel()
# ??????
domain_queue = self.get_domain()
while domain_queue.qsize() > 0:
while domain_queue.qsize() > 0:
now_count = channel.queue_declare(queue='domain_queue', durable=True).method.message_count
# ???????
if now_count < QUEUE_LENGTH_MIN:
# ????????????????????
while now_count < QUEUE_LENGTH_MAX:
if domain_queue.qsize() >= 100:
self.domain_push(domain_queue.get(), True)
else:
domain_queue = self.get_domain()
now_count = channel.queue_declare(queue='domain_queue', durable=True).method.message_count
# ?????????????
time.sleep(WAIT_INTERVAL)
# ?????????????????????
time.sleep(MONITOR_INTERVAL)
if domain_queue.qsize() < 100:
domain_queue = self.get_domain()
def __init__(self, host, port=5672, username='guest', password='guest', prefetch_count=1):
"""
???rmq?
:param host:
:param port:
:param username:
:param password:
:param prefetch_count:
:return:
"""
self.queue_host = host
self.queue_port = port
self.credentials = pika.credentials.PlainCredentials(username=username, password=password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.queue_host,
port=self.queue_port,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=prefetch_count)
def setUp(self):
super(TestStatusRequesterAndProvider, self).setUp()
self.response = None
# Set up communications
try:
self._connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
exchange = "{}.{}.status_request".format(self.__class__, uuid.uuid4())
self.requester = StatusRequester(self._connection, exchange=exchange)
self.manager = ProcessManager()
self.provider = StatusProvider(
self._connection, process_manager=self.manager, exchange=exchange)
def setUp(self):
super(TestProcessController, self).setUp()
try:
self._connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
self.exchange = '{}.{}.task_control'.format(
self.__class__, uuid.uuid4())
self.channel = self._connection.channel()
self.channel.exchange_declare(exchange=self.exchange, type='fanout')
self.manager = ProcessManager()
self.controller = ProcessController(
self._connection, exchange=self.exchange,
process_manager=self.manager)
def publish_submission_message(challenge_id, phase_id, submission_id):
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='evalai_submissions', type='topic')
# though worker is creating the queue(queue creation is idempotent too)
# but lets create the queue here again, so that messages dont get missed
# later on we can apply a check on queue message length to raise some alert
# this way we will be notified of worker being up or not
channel.queue_declare(queue='submission_task_queue', durable=True)
message = {
'challenge_id': challenge_id,
'phase_id': phase_id,
'submission_id': submission_id
}
channel.basic_publish(exchange='evalai_submissions',
routing_key='submission.*.*',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)) # make message persistent
print(" [x] Sent %r" % message)
connection.close()
def __init__(self,
exchange='flask',
routing_key='',
*args,
**kw):
kw_mq = { key : kw[key] for key in kw if key in self.args_mq}
kw = { key : kw[key] for key in kw if key not in self.args_mq}
ObserverMetrics.__init__(self, *args, **kw)
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(**kw_mq))
self.channel = connection.channel()
self.exchange = exchange
self.routing_key = routing_key
try:
self.channel.exchange_declare(exchange=exchange,
type='fanout')
self.logger.debug("Create channel RabbitMq '%s'" % exchange)
except:
self.logger.debug("Not create channel RabbitMq '%s'" % exchange)
except Exception as e:
self.logger.critical("Cannot connect to RabbitMq '%s'" % str(e))
def put_interaction(topic_map_identifier, json_body):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='storytechnologies', durable=True)
# Example messages:
# {"type": "scene", "command": "toggle-rotation"}
# {"type": "scene", "command": "navigate-to", "sceneIdentifier": "outpost"}
# {"type": "animation", "command": "toggle-animation", "animate": "true", "objectIdentifier": "windmill"}
channel.basic_publish(exchange='',
routing_key='storytechnologies',
body=json.dumps(json_body))
connection.close()
# POST /scenes
# POST /scenes/{identifier}/assets
# POST /scenes/{identifier}/attributes
# POST /paths
# POST /characters
# POST /characters/{identifier}/assets
# POST /props
# POST /props/{identifier}/assets
def publish_message(queue_name, message):
message_body = json.dumps(message)
message_id = message['message_id']
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
log_msg = "Published : [queue_name={}] [comment_id={}] [username={}] [comment_body={}]".format(
queue_name, message['comment_id'], message['username'], message['comment_body'])
logger.log_info_message(message_id, LogUtilityConstants.message_published_event,
'sub_monitor', log_msg)
connection.close()
def __init__(self, call_queue_name="wisp"):
self._call_queue_name = call_queue_name
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self._publish_channel = self._connection.channel()
self._publish_channel.queue_declare(queue=self._call_queue_name)
self._receive_channel = self._connection.channel()
result = self._receive_channel.queue_declare(exclusive=True)
self._receive_queue_name = result.method.queue
self._consuming_thread = threading.Thread(target=self.start_consuming,
args=(self._receive_channel,
self._receive_queue_name,
self.on_response))
self._consuming_thread.start()
def __init__(self, host='localhost', port=5672):
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
self._channel = self._connection.channel()
def __init__(self, exchangeName, host='localhost', port=5672):
self._exchangeName = exchangeName
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=exchangeName, type='direct')
def __init__(self, queueName, host='localhost', port=5672):
self._qname = queueName
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
self._channel = self._connection.channel()
self._channel.queue_declare(queue=queueName, durable=True)
self._exchange = ''
def __init__(self, queueName):
self._qname = queueName
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self._channel = self._connection.channel()
self._channel.queue_declare(queue=queueName, durable=True)
def application(env, start_response):
"""Setup the Websocket Server and read messages off the queue."""
connection = pika.BlockingConnection(
pika.connection.URLParameters(RABBIT_MQ_URL)
)
channel = connection.channel()
queue = env['PATH_INFO'].replace('/', '')
channel.queue_declare(queue=queue)
uwsgi.websocket_handshake(
env['HTTP_SEC_WEBSOCKET_KEY'],
env.get('HTTP_ORIGIN', '')
)
def keepalive():
"""Keep the websocket connection alive (called each minute)."""
print('PING/PONG...')
try:
uwsgi.websocket_recv_nb()
connection.add_timeout(60, keepalive)
except OSError as error:
print('Killing the connection...')
sys.exit(0)
def callback(ch, method, properties, body):
"""Callback called when a message has been received."""
try:
uwsgi.websocket_send(body)
except OSError as error:
print('Could not send message over the websocket', error)
sys.exit(0)
keepalive()
channel.basic_consume(callback, queue=queue, no_ack=True)
channel.start_consuming()