def purge(self, timeout: int = 5) -> None:
def _purge_errors(exc, interval):
self._logger.error('Purging error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)
def _purge_messages(cnsmr: BrightsideConsumer):
cnsmr.purge()
self._message = None
connection = BrokerConnection(hostname=self._amqp_uri)
with connections[connection].acquire(block=True) as conn:
self._logger.debug('Got connection: %s', conn.as_uri())
with Consumer(conn, queues=[self._queue], callbacks=[_purge_messages]) as consumer:
ensure_kwargs = self.RETRY_OPTIONS.copy()
ensure_kwargs['errback'] = _purge_errors
safe_purge = conn.ensure(consumer, _purge_messages, **ensure_kwargs)
safe_purge(consumer)
python类Consumer()的实例源码
def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0,
prefetch_count=0, **kwargs):
"""Add queue rule to Microservice
:param prefetch_count: count of messages for getting from mq
:param prefetch_size: size in bytes for getting data from mq
:param handler: function for handling messages
:param autoack: if True message.ack() after callback
:type handler: callable object
:param name: name of queue
:type name: str
"""
rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs)
consumer = Consumer(self.connection, queues=[Queue(rule.name)],
callbacks=[rule.callback], auto_declare=True)
consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size)
self.consumers.append(consumer)
self.logger.debug('Rule "%s" added!', rule.name)
def _start_consuming(self):
"""
Start consuming messages.
This function is blocking.
"""
consumer = kombu.Consumer(
self._conn,
queues=[self._callback_queue],
on_message=self._on_message,
# accept=[self._codec.content_type],
no_ack=True
)
consumer.consume()
while self._is_running:
try:
self._conn.drain_events(timeout=0.5)
except socket.timeout:
continue
def receive(self, timeout: int) -> BrightsideMessage:
self._message = BrightsideMessage(BrightsideMessageHeader(uuid4(), "", BrightsideMessageType.MT_NONE), BrightsideMessageBody(""))
def _consume(cnx: BrokerConnection, timesup: int) -> None:
try:
cnx.drain_events(timeout=timesup)
except kombu_exceptions.TimeoutError:
pass
except(kombu_exceptions.ChannelLimitExceeded,
kombu_exceptions.ConnectionLimitExceeded,
kombu_exceptions.OperationalError,
kombu_exceptions.NotBoundError,
kombu_exceptions.MessageStateError,
kombu_exceptions.LimitExceeded) as err:
raise ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", err)
def _consume_errors(exc, interval: int)-> None:
self._logger.error('Draining error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)
def _read_message(body: str, msg: KombuMessage) -> None:
self._logger.debug("Monitoring event received at: %s headers: %s payload: %s", datetime.utcnow().isoformat(), msg.headers, body)
self._msg = msg
self._message = self._message_factory.create_message(msg)
connection = BrokerConnection(hostname=self._amqp_uri)
with connections[connection].acquire(block=True) as conn:
self._logger.debug('Got connection: %s', conn.as_uri())
with Consumer(conn, queues=[self._queue], callbacks=[_read_message]) as consumer:
consumer.qos(prefetch_count=1)
ensure_kwargs = self.RETRY_OPTIONS.copy()
ensure_kwargs['errback'] = _consume_errors
safe_drain = conn.ensure(consumer, _consume, **ensure_kwargs)
safe_drain(conn, timeout)
return self._message
def get_consumers(self, Consumer, channel):
consumers = [
self._create_service_queues(self.services, Consumer, channel),
self._create_fanout_exchange(Consumer, channel)
]
return consumers
def _create_service_queues(self, services, Consumer, channel):
"""
Creates necessary AMQP queues, one per service.
"""
log.debug('Declaring exchange %s', self.exchange)
exchange = kombu.Exchange(
self.exchange,
channel=channel,
durable=False
)
exchange.declare()
# channel.exchange_declare(exchange=self.exchange)
queues = []
for service in services.values():
queue_name = '{}_service_{}'.format(self.exchange, service.name)
log.debug('Declaring service queue %s', queue_name)
queue = kombu.Queue(
channel=channel,
name=queue_name,
exchange=exchange,
routing_key=queue_name,
exclusive=False,
durable=False,
# channel=channel
)
queue.declare()
queues.append(queue)
# channel.queue_delete(queue=queue)
# channel.queue_declare(queue=queue, auto_delete=True)
# channel.queue_bind(queue, self.exchange)
# channel.basic_consume(self._on_message, queue=queue, no_ack=False)
consumer = Consumer(
# self.connection,
queues=queues,
on_message=self._on_message,
no_ack=False
)
# consumer.consume(no_ack=False)
return consumer
def _drain_events(self, on_event):
if not hasattr(self, 'bus_queue'):
raise Exception('You must listen for events before consuming them')
with Connection(self._url) as conn:
with Consumer(conn, self.bus_queue, callbacks=[on_event]):
try:
while True:
conn.drain_events(timeout=0.5)
except TimeoutError:
pass
def get_consumers(self, Consumer, channel):
"""
Returns a list of kombu.Consumer instances to service all registered
notification callbacks.
If using the kombu.mixin.ConsumerMixin mixin class, these instances
should be included in its get_consumers() method.
:param Consumer: Message consumer class.
:type Consumer: class
:param channel: An open channel.
:type channel: kombu.transport.*.Channel
:returns: A list of consumer instances
:rtype: [kombu.Consumer, ....]
"""
consumer_list = []
exchange = self.bus_mixin.producer.exchange
for routing_key, callbacks in self.notify_callbacks.items():
queue = kombu.Queue(
exchange=exchange, routing_key=routing_key)
consumer = Consumer(
queues=queue, callbacks=callbacks)
consumer_list.append(consumer)
self.bus_mixin.logger.info(
'Listening for "%s" notifications', routing_key)
return consumer_list
def Consumer(self, *args, **kwargs):
messages = self.get_messages()
if messages is not None:
print("USING DEADLINE CONSUMER")
return DeadlineConsumer(messages, *args, **kwargs)
else:
print("USING KOMBU CONSUMER")
return kombu.Consumer(*args, **kwargs)
def start_rmq_consume(self):
"""
start consuming from rmq
:return:
"""
logger = logging.getLogger(self.__class__.__name__)
logger.info("starting rabbit mq consumer")
channel = self._conn.channel()
# prep a consumer for the from_queue only
self._queue_consumer = Consumer(channel=channel,
queues=[self._from_queue],
callbacks=[self.process_message])
self._queue_consumer.consume()
def _shovel_to_buffer(self, from_queue):
"""
poor man's alternative to the shovel plugin
:param from_queue: shovel messages from which queue? entity.Queue object
"""
logger = logging.getLogger(self.__class__.__name__)
logger.info("shovelling all messages from error queue to buffer queue")
channel = self._conn.channel()
# prep a consumer for the from_queue only
queue_consumer = Consumer(channel=channel,
queues=[from_queue],
callbacks=[self._shoveller])
queue_consumer.consume()
# finally drain all the work items from error-queue into shoveller
while True:
try:
self._conn.drain_events(timeout=1)
except socket.timeout:
logger.debug("No more work-items in {q}".format(q=from_queue.name))
break
except socket.error as e:
# we don't care about EAGAIN, since we had intentionally started a non-blocking conn
if e.errno == 35:
msg = "{q} is empty".format(q=from_queue.name)
logger.debug(msg)
break
# disconnect
queue_consumer.cancel()
def _create_fanout_exchange(self, Consumer, channel):
"""
Creates a fanout queue to accept notifications.
"""
# declare_kwargs = dict(
# exchange='{}_fanout'.format(self.exchange)
# )
# if PIKA_VERSION >= StrictVersion('0.10.0'):
# kwarg = 'exchange_type'
# else:
# kwarg = 'type'
# declare_kwargs[kwarg] = 'fanout'
exchange_name = '{}_fanout'.format(self.exchange)
log.debug('Declaring fanout exchange %s', exchange_name)
exchange = kombu.Exchange(
name=exchange_name,
channel=channel,
durable=False,
type='fanout'
)
exchange.declare()
queue_name = 'fanout_callback_{}'.format(uuid.uuid4())
log.debug('Declaring fanout queue %s', queue_name)
queue = kombu.Queue(
name=queue_name,
exchange=exchange,
exclusive=True,
durable=False,
channel=channel
)
queue.declare()
consumer = Consumer(
# self.connection,
queues=[queue],
on_message=self._on_broadcast,
no_ack=True
# no_ack=True
)
# consumer.consume(no_ack=True)
# channel.exchange_declare(**declare_kwargs)
# fanout_queue = channel.queue_declare(exclusive=True)
# channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue)
# channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True)
return consumer