def publish(self, item, priority=0, retry=2):
body = json.dumps(item)
try:
self._channel.basic_publish(exchange=u'',
routing_key=self._queue_name,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
priority=priority
))
except exceptions.ConnectionClosed as err:
if retry <= 0:
raise err
self.open()
self.publish(item, retry - 1)
python类exceptions()的实例源码
def setUp(self):
super(TestStatusRequesterAndProvider, self).setUp()
self.response = None
# Set up communications
try:
self._connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
exchange = "{}.{}.status_request".format(self.__class__, uuid.uuid4())
self.requester = StatusRequester(self._connection, exchange=exchange)
self.manager = ProcessManager()
self.provider = StatusProvider(
self._connection, process_manager=self.manager, exchange=exchange)
def setUp(self):
super(TestProcessController, self).setUp()
try:
self._connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
self.exchange = '{}.{}.task_control'.format(
self.__class__, uuid.uuid4())
self.channel = self._connection.channel()
self.channel.exchange_declare(exchange=self.exchange, type='fanout')
self.manager = ProcessManager()
self.controller = ProcessController(
self._connection, exchange=self.exchange,
process_manager=self.manager)
def _publish(self, exchange_name, queue_name, body, priority, retry):
try:
self._channel.basic_publish(exchange=exchange_name,
routing_key=queue_name,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
priority=priority
))
except exceptions.ConnectionClosed as err:
if retry <= 0:
raise err
self.open()
self._publish(exchange_name, queue_name, body, priority, retry - 1)
def __del__(self):
try:
self._connection.close()
except pika.exceptions.AMQPError as e:
pass
def publishToExchange(self, exchangeName, routingKey, msg):
try:
self._channel.basic_publish(exchange=exchangeName, routing_key=routingKey, body=msg)
except pika.exceptions.AMQPError as e:
raise PipelineQueueError("Couldn't push message to exchange: {reason}".format(reason=e))
def __del__(self):
try:
self._connection.close()
except pika.exceptions.AMQPError as e:
pass
def __del__(self):
try:
self._connection.close()
except pika.exceptions.AMQPError as e:
pass
def publish(self, msg):
try:
self._channel.basic_publish(exchange=self._exchange, routing_key=self._qname, body=msg)
except pika.exceptions.AMQPError as e:
raise PipelineQueueError("Couldn't push message to queue: {reason}".format(reason=e))
def acknowledge(self, method):
if method:
try:
self._channel.basic_ack(method.delivery_tag)
except pika.exceptions.AMQPError as e:
raise PipelineQueueError("Couldn't acknowledge message: {reason}".format(reason=e))
def catch_error(func):
"""Catch errors of rabbitmq then reconnect"""
import amqp
try:
import pika.exceptions
connect_exceptions = (
pika.exceptions.ConnectionClosed,
pika.exceptions.AMQPConnectionError,
)
except ImportError:
connect_exceptions = ()
connect_exceptions += (
select.error,
socket.error,
amqp.ConnectionError
)
def wrap(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except connect_exceptions as e:
logging.error('RabbitMQ error: %r, reconnect.', e)
self.reconnect()
return func(self, *args, **kwargs)
return wrap
def reconnect(self):
"""Reconnect to rabbitmq server"""
import pika
import pika.exceptions
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = self.connection.channel()
try:
self.channel.queue_declare(self.name)
except pika.exceptions.ChannelClosed:
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = self.connection.channel()
#self.channel.queue_purge(self.name)
def reconnect(self):
"""Reconnect to rabbitmq server"""
parsed = urlparse.urlparse(self.amqp_url)
port = parsed.port or 5672
self.connection = amqp.Connection(host="%s:%s" % (parsed.hostname, port),
userid=parsed.username or 'guest',
password=parsed.password or 'guest',
virtual_host=unquote(
parsed.path.lstrip('/') or '%2F'))
self.channel = self.connection.channel()
try:
self.channel.queue_declare(self.name)
except amqp.exceptions.PreconditionFailed:
pass
#self.channel.queue_purge(self.name)
def setUp(self):
super(TestStatusProvider, self).setUp()
self._response = None
self._corr_id = None
try:
self._connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
self.channel = self._connection.channel()
# Set up the request exchange
self.request_exchange = '{}.{}.task_control'.format(
self.__class__, uuid.uuid4())
self.channel.exchange_declare(exchange=self.request_exchange, type='fanout')
# Set up the response queue
result = self.channel.queue_declare(exclusive=True)
self.response_queue = result.method.queue
self.channel.basic_consume(
self._on_response, no_ack=True, queue=self.response_queue)
self.manager = ProcessManager()
self.provider = StatusProvider(
self._connection, exchange=self.request_exchange,
process_manager=self.manager)
def setUp(self):
super(TestTaskControllerAndRunner, self).setUp()
try:
connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
queue = "{}.{}.tasks".format(self.__class__, uuid.uuid4())
self.sender = TaskController(connection, queue=queue)
self.runner = TaskRunner(connection, queue=queue)
def get(self, retry=2):
try:
method_frame, properties, body = self._channel.basic_get(self._queue_name, no_ack=True)
if not body:
return
return json.loads(body)
except exceptions.ConnectionClosed as err:
if retry <= 0:
raise err
self.open()
return self.get(retry - 1)
def get(self, retry = 2):
try:
method_frame, properties, body = self._channel.basic_get(self._queue_name, no_ack=True)
if not body:
return
return json.loads(body)
except exceptions.ConnectionClosed as err:
if retry <= 0:
raise err
self.open()
return self.get(retry - 1)
def _retry(self, f):
while True:
try:
return f()
except pika.exceptions.ConnectionClosed:
logging.exception("AMQP connection is down...")
time.sleep(1)
self._connect()