def setUp(self):
self.exchange = Exchange(EXCHANGE, type="topic")
self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange,
durable=True)
self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web",
exchange=self.exchange)
self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange)
weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange)
with self._create_connection() as connection:
self.result_queue(connection).declare()
self.result_queue(connection).purge()
self.web_harvest_queue(connection).declare()
self.web_harvest_queue(connection).purge()
self.warc_created_queue(connection).declare()
self.warc_created_queue(connection).purge()
# avoid raise NOT_FOUND error 404
weibo_harvester_queue(connection).declare()
weibo_harvester_queue(connection).purge()
self.path = None
python类Queue()的实例源码
test_weibo_harvester.py 文件源码
项目:sfm-weibo-harvester
作者: gwu-libraries
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def check_queue_message(self, message):
q = kombu.Queue(message['queue'], channel=self.ch)
msg = q.get(True)
assert msg.body in message['id'],\
"Message body is {}, expected {}".format(msg.body, message['id'])
def declare_exchange(self, name, type='direct', queues=None, **options):
"""Create or update exchange
:param name: name of exchange
:type name: str
:param type: type of exchange - direct, fanout, topic, match
:type type: str
:param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...]
:type queues: list, None or tuple
:param options: additional options for Exchange creation
"""
if queues is None:
queues = [] # pragma: no cover
with self.connections[self.connection].acquire() as conn:
exchange = Exchange(name, type=type, channel=conn, **options)
exchange.declare()
self.exchanges[name] = exchange
for q_name, routing_key in queues:
queue = Queue(name=q_name, channel=conn)
queue.declare()
queue.bind_to(exchange=name, routing_key=routing_key)
self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name,
routing_key if routing_key else q_name, name)
def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0,
prefetch_count=0, **kwargs):
"""Add queue rule to Microservice
:param prefetch_count: count of messages for getting from mq
:param prefetch_size: size in bytes for getting data from mq
:param handler: function for handling messages
:param autoack: if True message.ack() after callback
:type handler: callable object
:param name: name of queue
:type name: str
"""
rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs)
consumer = Consumer(self.connection, queues=[Queue(rule.name)],
callbacks=[rule.callback], auto_declare=True)
consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size)
self.consumers.append(consumer)
self.logger.debug('Rule "%s" added!', rule.name)
def consume(self):
def worker(event, message):
page_id = event['recipient']['id']
bot_class, bot_args = self.get_bot(page_id)
p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args,
self.transport, self.send_exchange,
self.send_queue, event, message))
p.start()
def stop_worker(signum, frame):
p.terminate()
p.join()
signal.signal(signal.SIGTERM, stop_worker)
exchange = Exchange(self.exchange, 'direct', durable=True)
queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
with Connection(self.transport) as conn:
with conn.Consumer(queue, callbacks=[worker]) as consumer:
while True:
conn.drain_events()
def sender(self):
def worker(event, message):
p = Process(target=spawn_send_message_worker, args=(event, message))
p.start()
def stop_worker(signum, frame):
p.terminate()
p.join()
signal.signal(signal.SIGTERM, stop_worker)
exchange = Exchange(self.send_exchange, 'direct', durable=True)
queue = Queue(self.send_queue, exchange=exchange,
routing_key=self.send_queue)
with Connection(self.send_transport) as conn:
with conn.Consumer(queue, callbacks=[worker]) as consumer:
while True:
conn.drain_events()
def connect(self, exchange, channel): # pragma: no cover
"""
Readies the StorageNotify for publishing notification messages by
setting up a kombu.Producer.
:param exchange: The exchange for publishing notifications.
:type exchange: kombu.Exchange
:param channel: The channel to bind to.
:type channel: kombu.transport.base.StdChannel
"""
name = self.__class__.__name__
self.logger.debug('Connecting {}'.format(name))
self._queue = kombu.Queue(exchange=exchange, channel=channel)
self._queue.declare()
self._producer = kombu.Producer(channel, exchange)
def _configure_dead_exchange(self, connection):
def declare_dead_queue():
channel = connection.channel()
dead_exchange = Exchange(name=config.rabbitmq_dead_exchange(),
type='direct',
channel=channel)
dead_queue = Queue(name=config.rabbitmq_dead_queue(),
routing_key=config.rabbitmq_routing_key(),
exchange=dead_exchange,
channel=channel)
dead_queue.declare()
return dead_exchange
def error_callback(exception, interval):
logging.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r' %
(interval, exception))
declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback,
interval_start=0, interval_step=5, interval_max=30)
return declare_dead_queue()
openstack-dns-updater.py 文件源码
项目:fuel-plugin-dns-updater
作者: openstack
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_consumers(self, consumer, channel):
consumers = []
exchanges = DNS_CONF["exchanges"]
exchanges = exchanges.split(",")
for exch in exchanges:
exchange = Exchange(exch, type="topic", durable=False)
queue = Queue(DNS_CONF["queue_name"], exchange,
routing_key=DNS_CONF["routing_key"],
durable=False, auto_delete=True, no_ack=True)
consumers.append(consumer(queue, callbacks=[self.on_message]))
return consumers
def __init__(self, connection: Connection, configuration: BrightsideConsumerConfiguration, logger: logging.Logger=None) -> None:
self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable)
self._routing_key = configuration.routing_key
self._amqp_uri = connection.amqp_uri
self._queue_name = configuration.queue_name
self._routing_key = configuration.routing_key
self._prefetch_count = configuration.prefetch_count
self._is_durable = configuration.is_durable
self._message_factory = ArameMessageFactory()
self._logger = logger or logging.getLogger(__name__)
self._queue = Queue(self._queue_name, exchange=self._exchange, routing_key=self._routing_key)
self._msg = None # Kombu Message
self._message = None # Brightside Message
# TODO: Need to fix the argument types with default types issue
def _create_queues(self, queue_list):
for topic in queue_list:
self.queues.append(
Queue(
name=self.queue_name,
exchange=self.exchange,
routing_key=topic,
durable=False,
exclusive=True,
no_ack=True
)
)
def create_queue(self):
test_queue = 'test-rabbit-{}'.format(utils.rand_name())
q = kombu.Queue(test_queue, channel=self.ch, durable=False,
queue_arguments={"x-expires": 15 * 60 * 1000})
q.declare()
return test_queue
def delete_queue(self, queue):
q = kombu.Queue(queue, channel=self.ch)
q.delete()
def __init__(self, client, name, logger=None):
"""Initialization
:param client: instance of client
:type client: Client
:param name: name of queue
:type name: str
"""
self.client = client
self.name = name
if logger is None:
logger = _logger # pragma: no cover
self.logger = logger
self.logger.debug('Queue "%s" built', self.name)
def purge_queue(self, name):
"""Remove all messages from queue
:param name: name of queue
:type name: str
"""
connections = pools.Connections(self.limit)
with connections[self.connection].acquire() as conn:
Queue(name=name, channel=conn).purge()
self.logger.debug('Queue "%s" was purged', name)
def delete_queue(self, name):
"""Delete queue by name
:param name: name of queue
:type name: str
"""
with self.connections[self.connection].acquire() as conn:
Queue(name=name, channel=conn).delete()
self.logger.debug('Queue "%s" was deleted', name)
def _create_callback_queue(self, channel, exchange):
name = 'response-{}'.format(uuid.uuid4())
callback_queue = kombu.Queue(
name=name,
exchange=exchange,
routing_key=name,
exclusive=True,
channel=self._channel
)
callback_queue.declare()
return callback_queue
def __init__(self, consumer):
self.consumer = proxy(consumer)
super(PublisherThread, self).__init__()
self.daemon = True
self._out_queue = Queue()
self._is_running = False
def _create_service_queues(self, services, Consumer, channel):
"""
Creates necessary AMQP queues, one per service.
"""
log.debug('Declaring exchange %s', self.exchange)
exchange = kombu.Exchange(
self.exchange,
channel=channel,
durable=False
)
exchange.declare()
# channel.exchange_declare(exchange=self.exchange)
queues = []
for service in services.values():
queue_name = '{}_service_{}'.format(self.exchange, service.name)
log.debug('Declaring service queue %s', queue_name)
queue = kombu.Queue(
channel=channel,
name=queue_name,
exchange=exchange,
routing_key=queue_name,
exclusive=False,
durable=False,
# channel=channel
)
queue.declare()
queues.append(queue)
# channel.queue_delete(queue=queue)
# channel.queue_declare(queue=queue, auto_delete=True)
# channel.queue_bind(queue, self.exchange)
# channel.basic_consume(self._on_message, queue=queue, no_ack=False)
consumer = Consumer(
# self.connection,
queues=queues,
on_message=self._on_message,
no_ack=False
)
# consumer.consume(no_ack=False)
return consumer
def _queue(self):
queue_name = 'flask-socketio.' + str(uuid.uuid4())
return kombu.Queue(queue_name, self._exchange(),
queue_arguments={'x-expires': 300000})
def get_consumers(self, consumer, channel):
exchange = Exchange(self.nova_exchange, type="topic", durable=False)
queue = Queue(self.queue_name, exchange, routing_key=self.routing_key,
durable=False, auto_delete=True, no_ack=True)
return [consumer(queue, callbacks=[self.handle_notification])]
def __init__(self, global_config):
self._events_pubsub = Pubsub()
self._bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**global_config['bus'])
self._exchange = Exchange(global_config['bus']['exchange_name'],
type=global_config['bus']['exchange_type'])
self._queue = kombu.Queue(exclusive=True)
self._is_running = False
def listen_events(self, routing_key, exchange=BUS_EXCHANGE_XIVO):
with Connection(self._url) as conn:
queue = Queue(BUS_QUEUE_NAME, exchange=exchange, routing_key=routing_key, channel=conn.channel())
queue.declare()
queue.purge()
self.bus_queue = queue
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None,
queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True):
self.hosts_conf = hosts_conf
self.hosts = self.create_hosts()
self.connection = Connection(self.hosts)
self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments)
self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key,
queue_arguments=queue_arguments)]
self.callback = callback
self.no_ack = no_ack
def queue_size(self, queue_list, queue_arguments=None):
result = dict()
for i in queue_list:
queue_size = self.connection.SimpleQueue(name=Queue(name=i, queue_arguments=queue_arguments)).qsize()
result[i] = queue_size
return result
def queue_send(self, recipient, message=None, sender_action=None):
exchange = Exchange(self.send_exchange, 'direct', durable=True)
queue = Queue(self.send_queue, exchange=exchange,
routing_key=self.send_queue)
with Connection(self.send_transport) as conn:
producer = conn.Producer(serializer='json')
event = {
'recipient': recipient,
'message': message,
'sender_action': sender_action,
'page_access_token': self.page_access_token
}
producer.publish(event, exchange=exchange,
routing_key=queue.routing_key,
declare=[queue])
def queue_events(self, events):
exchange = Exchange(self.exchange, 'direct', durable=True)
queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
with Connection(self.transport) as conn:
producer = conn.Producer(serializer='json')
for event in events:
producer.publish(event, exchange=exchange,
routing_key=queue.routing_key,
declare=[queue])
def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange)
return queue
def get_consumers(self, Consumer, channel):
"""
Returns a list of kombu.Consumer instances to service all registered
notification callbacks.
If using the kombu.mixin.ConsumerMixin mixin class, these instances
should be included in its get_consumers() method.
:param Consumer: Message consumer class.
:type Consumer: class
:param channel: An open channel.
:type channel: kombu.transport.*.Channel
:returns: A list of consumer instances
:rtype: [kombu.Consumer, ....]
"""
consumer_list = []
exchange = self.bus_mixin.producer.exchange
for routing_key, callbacks in self.notify_callbacks.items():
queue = kombu.Queue(
exchange=exchange, routing_key=routing_key)
consumer = Consumer(
queues=queue, callbacks=callbacks)
consumer_list.append(consumer)
self.bus_mixin.logger.info(
'Listening for "%s" notifications', routing_key)
return consumer_list
def connect(self):
"""
'Connects' to the bus.
:returns: The same instance.
:rtype: commissaire_http.bus.Bus
"""
if self.connection is not None:
self.logger.warn('Bus already connected.')
return self
self.connection = Connection(self.connection_url)
self._channel = self.connection.channel()
self._exchange = Exchange(
self.exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Create queues
self._queues = []
for kwargs in self.qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug('Created queue %s', queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Bus connection finished')
return self