def setUp(self):
self.exchange = Exchange(EXCHANGE, type="topic")
self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange,
durable=True)
self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web",
exchange=self.exchange)
self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange)
weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange)
with self._create_connection() as connection:
self.result_queue(connection).declare()
self.result_queue(connection).purge()
self.web_harvest_queue(connection).declare()
self.web_harvest_queue(connection).purge()
self.warc_created_queue(connection).declare()
self.warc_created_queue(connection).purge()
# avoid raise NOT_FOUND error 404
weibo_harvester_queue(connection).declare()
weibo_harvester_queue(connection).purge()
self.path = None
python类Exchange()的实例源码
test_weibo_harvester.py 文件源码
项目:sfm-weibo-harvester
作者: gwu-libraries
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def __init__(self, client, name, routing_key=None, logger=None):
"""Initialization
:param client: instance of client
:type client: Client
:param name: name of exchange
:type name: str
:param routing_key: routing key to queue
:type routing_key: str or None
"""
self.client = client
self.name = name
self.routing_key = routing_key
if logger is None:
logger = _logger # pragma: no cover
self.logger = logger
self.logger.debug('Exchange "%s" built, routing_key: %s', self.name,
self.routing_key if not self.routing_key is None else '')
def declare_exchange(self, name, type='direct', queues=None, **options):
"""Create or update exchange
:param name: name of exchange
:type name: str
:param type: type of exchange - direct, fanout, topic, match
:type type: str
:param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...]
:type queues: list, None or tuple
:param options: additional options for Exchange creation
"""
if queues is None:
queues = [] # pragma: no cover
with self.connections[self.connection].acquire() as conn:
exchange = Exchange(name, type=type, channel=conn, **options)
exchange.declare()
self.exchanges[name] = exchange
for q_name, routing_key in queues:
queue = Queue(name=q_name, channel=conn)
queue.declare()
queue.bind_to(exchange=name, routing_key=routing_key)
self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name,
routing_key if routing_key else q_name, name)
def _connect(self):
self._conn = kombu.Connection(
self._hostname,
connect_timeout=self._connect_timeout
)
self._channel = self._conn.channel()
self._exchange = kombu.Exchange(
name=self._exchange_name,
channel=self._channel,
durable=False
)
self._callback_queue = self._create_callback_queue(
self._channel,
self._exchange
)
def consume(self):
def worker(event, message):
page_id = event['recipient']['id']
bot_class, bot_args = self.get_bot(page_id)
p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args,
self.transport, self.send_exchange,
self.send_queue, event, message))
p.start()
def stop_worker(signum, frame):
p.terminate()
p.join()
signal.signal(signal.SIGTERM, stop_worker)
exchange = Exchange(self.exchange, 'direct', durable=True)
queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
with Connection(self.transport) as conn:
with conn.Consumer(queue, callbacks=[worker]) as consumer:
while True:
conn.drain_events()
def sender(self):
def worker(event, message):
p = Process(target=spawn_send_message_worker, args=(event, message))
p.start()
def stop_worker(signum, frame):
p.terminate()
p.join()
signal.signal(signal.SIGTERM, stop_worker)
exchange = Exchange(self.send_exchange, 'direct', durable=True)
queue = Queue(self.send_queue, exchange=exchange,
routing_key=self.send_queue)
with Connection(self.send_transport) as conn:
with conn.Consumer(queue, callbacks=[worker]) as consumer:
while True:
conn.drain_events()
def connect(self, exchange, channel): # pragma: no cover
"""
Readies the StorageNotify for publishing notification messages by
setting up a kombu.Producer.
:param exchange: The exchange for publishing notifications.
:type exchange: kombu.Exchange
:param channel: The channel to bind to.
:type channel: kombu.transport.base.StdChannel
"""
name = self.__class__.__name__
self.logger.debug('Connecting {}'.format(name))
self._queue = kombu.Queue(exchange=exchange, channel=channel)
self._queue.declare()
self._producer = kombu.Producer(channel, exchange)
def _publish(self, event, model_instance):
"""
Internal function to publish "created", "deleted", and "updated"
notification messages.
:param event: The event name ("created", "deleted", or "updated")
:type event: str
:param model_instance: The model instance upon which the event occurred
:type model_instance: commissaire.model.Model
"""
class_name = model_instance.__class__.__name__
body = {
'event': event,
'class': class_name,
'model': model_instance.to_dict_safe()
}
routing_key = 'notify.storage.{}.{}'.format(class_name, event)
if self._producer:
self.logger.debug('Publish "{}": {}'.format(routing_key, body))
self._producer.publish(
body, routing_key,
kombu.Exchange.TRANSIENT_DELIVERY_MODE)
else:
# This means the connect() method was not called.
self.logger.warn('Not publishing "%s"', routing_key)
def _configure_dead_exchange(self, connection):
def declare_dead_queue():
channel = connection.channel()
dead_exchange = Exchange(name=config.rabbitmq_dead_exchange(),
type='direct',
channel=channel)
dead_queue = Queue(name=config.rabbitmq_dead_queue(),
routing_key=config.rabbitmq_routing_key(),
exchange=dead_exchange,
channel=channel)
dead_queue.declare()
return dead_exchange
def error_callback(exception, interval):
logging.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r' %
(interval, exception))
declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback,
interval_start=0, interval_step=5, interval_max=30)
return declare_dead_queue()
def send(self, topic, message):
"""Publishes a pulse message to the proper exchange."""
if not message:
Log.error("Expecting a message")
message._prepare()
if not self.connection:
self.connect()
producer = Producer(
channel=self.connection,
exchange=Exchange(self.settings.exchange, type='topic'),
routing_key=topic
)
# The message is actually a simple envelope format with a payload and
# some metadata.
final_data = Data(
payload=message.data,
_meta=set_default({
'exchange': self.settings.exchange,
'routing_key': message.routing_key,
'serializer': self.settings.serializer,
'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))),
'count': self.count
}, message.metadata)
)
producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer)
self.count += 1
openstack-dns-updater.py 文件源码
项目:fuel-plugin-dns-updater
作者: openstack
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_consumers(self, consumer, channel):
consumers = []
exchanges = DNS_CONF["exchanges"]
exchanges = exchanges.split(",")
for exch in exchanges:
exchange = Exchange(exch, type="topic", durable=False)
queue = Queue(DNS_CONF["queue_name"], exchange,
routing_key=DNS_CONF["routing_key"],
durable=False, auto_delete=True, no_ack=True)
consumers.append(consumer(queue, callbacks=[self.on_message]))
return consumers
def __init__(self, connection: Connection, logger: logging.Logger=None) -> None:
self._amqp_uri = connection.amqp_uri
self._cnx = BrokerConnection(hostname=connection.amqp_uri)
self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable)
self._logger = logger or logging.getLogger(__name__)
def __init__(self, connection: Connection, configuration: BrightsideConsumerConfiguration, logger: logging.Logger=None) -> None:
self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable)
self._routing_key = configuration.routing_key
self._amqp_uri = connection.amqp_uri
self._queue_name = configuration.queue_name
self._routing_key = configuration.routing_key
self._prefetch_count = configuration.prefetch_count
self._is_durable = configuration.is_durable
self._message_factory = ArameMessageFactory()
self._logger = logger or logging.getLogger(__name__)
self._queue = Queue(self._queue_name, exchange=self._exchange, routing_key=self._routing_key)
self._msg = None # Kombu Message
self._message = None # Brightside Message
# TODO: Need to fix the argument types with default types issue
def build_queues(cls):
default_exchange = Exchange(cls.default_name, type='direct')
queues = [
Queue(cls.default_name, default_exchange, routing_key=cls.default_name),
Queue(cls.system_name, default_exchange, routing_key=cls.system_name)
]
return tuple(set(queues))
def delete_exchange(self, name):
"""Delete exchange by name
:param name: name of exchange
:type name: str
"""
with self.connections[self.connection].acquire() as conn:
exchange = self.exchanges.pop(name, Exchange(name, channel=conn))
exchange.delete()
self.logger.debug('Exchange "%s" was deleted', name)
def _create_service_queues(self, services, Consumer, channel):
"""
Creates necessary AMQP queues, one per service.
"""
log.debug('Declaring exchange %s', self.exchange)
exchange = kombu.Exchange(
self.exchange,
channel=channel,
durable=False
)
exchange.declare()
# channel.exchange_declare(exchange=self.exchange)
queues = []
for service in services.values():
queue_name = '{}_service_{}'.format(self.exchange, service.name)
log.debug('Declaring service queue %s', queue_name)
queue = kombu.Queue(
channel=channel,
name=queue_name,
exchange=exchange,
routing_key=queue_name,
exclusive=False,
durable=False,
# channel=channel
)
queue.declare()
queues.append(queue)
# channel.queue_delete(queue=queue)
# channel.queue_declare(queue=queue, auto_delete=True)
# channel.queue_bind(queue, self.exchange)
# channel.basic_consume(self._on_message, queue=queue, no_ack=False)
consumer = Consumer(
# self.connection,
queues=queues,
on_message=self._on_message,
no_ack=False
)
# consumer.consume(no_ack=False)
return consumer
def _exchange(self):
return kombu.Exchange(self.channel, type='fanout', durable=False)
def get_consumers(self, consumer, channel):
exchange = Exchange(self.nova_exchange, type="topic", durable=False)
queue = Queue(self.queue_name, exchange, routing_key=self.routing_key,
durable=False, auto_delete=True, no_ack=True)
return [consumer(queue, callbacks=[self.handle_notification])]
def _make_publisher(self):
bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
bus_connection = Connection(bus_url)
bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'])
bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
bus_marshaler = Marshaler(self._uuid)
return Publisher(bus_producer, bus_marshaler)
def __init__(self, global_config):
self._events_pubsub = Pubsub()
self._bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**global_config['bus'])
self._exchange = Exchange(global_config['bus']['exchange_name'],
type=global_config['bus']['exchange_type'])
self._queue = kombu.Queue(exclusive=True)
self._is_running = False
def _make_publisher(self):
bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
bus_connection = Connection(bus_url)
same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False}
bus_exchange = Exchange(self.config['exchange_name'],
type=self.config['exchange_type'],
**same_exchange_arguments_as_collectd)
bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
bus_marshaler = CollectdMarshaler(self._uuid)
return Publisher(bus_producer, bus_marshaler)
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None,
queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True):
self.hosts_conf = hosts_conf
self.hosts = self.create_hosts()
self.connection = Connection(self.hosts)
self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments)
self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key,
queue_arguments=queue_arguments)]
self.callback = callback
self.no_ack = no_ack
def queue_send(self, recipient, message=None, sender_action=None):
exchange = Exchange(self.send_exchange, 'direct', durable=True)
queue = Queue(self.send_queue, exchange=exchange,
routing_key=self.send_queue)
with Connection(self.send_transport) as conn:
producer = conn.Producer(serializer='json')
event = {
'recipient': recipient,
'message': message,
'sender_action': sender_action,
'page_access_token': self.page_access_token
}
producer.publish(event, exchange=exchange,
routing_key=queue.routing_key,
declare=[queue])
def queue_events(self, events):
exchange = Exchange(self.exchange, 'direct', durable=True)
queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
with Connection(self.transport) as conn:
producer = conn.Producer(serializer='json')
for event in events:
producer.publish(event, exchange=exchange,
routing_key=queue.routing_key,
declare=[queue])
def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange)
return queue
def connect(self):
"""
'Connects' to the bus.
:returns: The same instance.
:rtype: commissaire_http.bus.Bus
"""
if self.connection is not None:
self.logger.warn('Bus already connected.')
return self
self.connection = Connection(self.connection_url)
self._channel = self.connection.channel()
self._exchange = Exchange(
self.exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Create queues
self._queues = []
for kwargs in self.qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug('Created queue %s', queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Bus connection finished')
return self
def _configure_retry_exchanges(self, connection):
def declare_queues():
channel = connection.channel()
almanach_exchange = Exchange(name=config.rabbitmq_retry_return_exchange(),
type='direct',
channel=channel)
retry_exchange = Exchange(name=config.rabbitmq_retry_exchange(),
type='direct',
channel=channel)
retry_queue = Queue(name=config.rabbitmq_retry_queue(),
exchange=retry_exchange,
routing_key=config.rabbitmq_routing_key(),
queue_arguments=self._get_queue_arguments(),
channel=channel)
almanach_queue = Queue(name=config.rabbitmq_queue(),
exchange=almanach_exchange,
durable=False,
routing_key=config.rabbitmq_routing_key(),
channel=channel)
retry_queue.declare()
almanach_queue.declare()
return retry_exchange
def error_callback(exception, interval):
logging.error('Failed to declare queues and exchanges, retrying in %d seconds. %r' % (interval, exception))
declare_queues = connection.ensure(connection, declare_queues, errback=error_callback,
interval_start=0, interval_step=5, interval_max=30)
return declare_queues()
def __init__(self, exchange_name, broker_url, mode=ASYNC):
"""??????
Args:
exchange_name (string): ????
broker_url (string): ????
mode (int): ??
"""
self.exchange_name = exchange_name
self.broker_url = broker_url
self.mode = mode
self.exchange = Exchange(exchange_name, type='direct')
self.connection = Connection(broker_url)
def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange)
return queue
def __init__(self, connection):
self.connection = connection
self.uuid = uuid4()
self.broadcast_exchange = Exchange('BROADCAST', type='topic', passive=True)
self.p2p_exchange = Exchange('P2P', type='topic', passive=True)