def publisher(nameko_config, **kwargs):
""" Return a function that sends AMQP messages.
"""
def publish(payload, routing_key, exchange=None):
""" Dispatch a message with `payload`
"""
conn = Connection(nameko_config[AMQP_URI_CONFIG_KEY])
with connections[conn].acquire(block=True) as connection:
if exchange is not None: # pragma: no cover
exchange.maybe_bind(connection)
with producers[conn].acquire(block=True) as producer:
producer.publish(
payload,
routing_key=routing_key,
exchange=exchange,
**kwargs
)
return publish
python类Connection()的实例源码
def publish_message(rabbit_config):
def publish(
exchange, payload, routing_key=None, serializer="json", **kwargs
):
conn = Connection(rabbit_config[AMQP_URI_CONFIG_KEY])
with connections[conn].acquire(block=True) as connection:
exchange.maybe_bind(connection)
with producers[conn].acquire(block=True) as producer:
producer.publish(
payload,
exchange=exchange,
routing_key=routing_key,
serializer=serializer,
**kwargs
)
return publish
def deadlettering_exchange(self, rabbit_config, exchange, queue):
conn = Connection(rabbit_config[AMQP_URI_CONFIG_KEY])
with connections[conn].acquire(block=True) as connection:
deadletter_exchange = Exchange(name="deadletter", type="topic")
deadletter_exchange.maybe_bind(connection)
deadletter_exchange.declare()
deadletter_queue = Queue(
name="deadletter",
exchange=deadletter_exchange,
routing_key="#",
queue_arguments={
'x-dead-letter-exchange': exchange.name
}
)
deadletter_queue.maybe_bind(connection)
deadletter_queue.declare()
return deadletter_exchange
def _get_connection(self, connection):
"""Create connection strategy
:param connection: connection for broker
:type connection: str, None, kombu.connections.Connection, dict
:return: instance of kombu.connections.Connection
:rtype: Connection
"""
if not connection:
connection = self.default_connection # pragma: no cover
if isinstance(connection, str):
connection = {'hostname': connection}
if isinstance(connection, dict):
connection = Connection(**connection)
return connection
def _get_connection(self, connection):
"""Create connection strategy
:param connection: connection for broker
:type connection: str, None, kombu.connections.Connection, dict
:return: instance of kombu.connections.Connection
:rtype: Connection
"""
if not connection:
connection = self.connection # pragma: no cover
if isinstance(connection, str):
connection = {'hostname': connection}
if isinstance(connection, dict):
connection = Connection(**connection)
return connection
def _connect(self):
self._conn = kombu.Connection(
self._hostname,
connect_timeout=self._connect_timeout
)
self._channel = self._conn.channel()
self._exchange = kombu.Exchange(
name=self._exchange_name,
channel=self._channel,
durable=False
)
self._callback_queue = self._create_callback_queue(
self._channel,
self._exchange
)
def __init__(self, name, url="amqp://", maxsize=0, lazy_limit=True):
"""
Constructor for KombuQueue
url: http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
maxsize: an integer that sets the upperbound limit on the number of
items that can be placed in the queue.
"""
self.name = name
self.conn = Connection(url)
self.queue = self.conn.SimpleQueue(self.name, no_ack=True, serializer='umsgpack')
self.maxsize = maxsize
self.lazy_limit = lazy_limit
if self.lazy_limit and self.maxsize:
self.qsize_diff_limit = int(self.maxsize * 0.1)
else:
self.qsize_diff_limit = 0
self.qsize_diff = 0
def on_consume_ready(
self, connection, channel, consumers): # pragma: no cover
"""
Called when the service is ready to consume messages.
:param connection: The current connection instance.
:type connection: kombu.Connection
:param channel: The current channel.
:type channel: kombu.transport.*.Channel
:param consumers: A list of consumers.
:type consumers: list
"""
self.logger.info('Ready to consume')
if self.logger.level == logging.DEBUG:
queue_names = []
for consumer in consumers:
queue_names += [x.name for x in consumer.queues]
self.logger.debug(
'Consuming via connection "{}" and channel "{}" on '
'the following queues: "{}"'.format(
connection.as_uri(), channel, '", "'.join(queue_names)))
def consume(self):
def worker(event, message):
page_id = event['recipient']['id']
bot_class, bot_args = self.get_bot(page_id)
p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args,
self.transport, self.send_exchange,
self.send_queue, event, message))
p.start()
def stop_worker(signum, frame):
p.terminate()
p.join()
signal.signal(signal.SIGTERM, stop_worker)
exchange = Exchange(self.exchange, 'direct', durable=True)
queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
with Connection(self.transport) as conn:
with conn.Consumer(queue, callbacks=[worker]) as consumer:
while True:
conn.drain_events()
def sender(self):
def worker(event, message):
p = Process(target=spawn_send_message_worker, args=(event, message))
p.start()
def stop_worker(signum, frame):
p.terminate()
p.join()
signal.signal(signal.SIGTERM, stop_worker)
exchange = Exchange(self.send_exchange, 'direct', durable=True)
queue = Queue(self.send_queue, exchange=exchange,
routing_key=self.send_queue)
with Connection(self.send_transport) as conn:
with conn.Consumer(queue, callbacks=[worker]) as consumer:
while True:
conn.drain_events()
def publish(config):
conn = Connection(config[AMQP_URI_CONFIG_KEY])
def publish(payload, routing_key, exchange=None, **kwargs):
"""Publish an AMQP message."""
with kombu_connections[conn].acquire(block=True) as connection:
if exchange is not None:
exchange.maybe_bind(connection)
with producers[conn].acquire(block=True) as producer:
producer.publish(
payload,
exchange=exchange,
serializer='json',
routing_key=routing_key,
**kwargs)
return publish
def connect(self):
if not self.connection:
self.connection = Connection(
hostname=self.settings.host,
port=self.settings.port,
userid=self.settings.user,
password=self.settings.password,
virtual_host=self.settings.vhost,
ssl=self.settings.ssl
)
def republish(self, backoff_exc, message, target_queue):
expiration = backoff_exc.next(message, self.exchange.name)
queue = self.make_queue(expiration)
# republish to appropriate backoff queue
amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
with get_producer(amqp_uri) as producer:
properties = message.properties.copy()
headers = properties.pop('application_headers')
headers['backoff'] = expiration
expiration_seconds = float(expiration) / 1000
# force redeclaration; the publisher will skip declaration if
# the entity has previously been declared by the same connection
conn = Connection(amqp_uri)
maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY)
producer.publish(
message.body,
headers=headers,
exchange=self.exchange,
routing_key=target_queue,
expiration=expiration_seconds,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY,
declare=[queue.exchange, queue],
**properties
)
test_weibo_harvester.py 文件源码
项目:sfm-weibo-harvester
作者: gwu-libraries
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _create_connection():
return Connection(hostname="mq", userid=tests.mq_username, password=tests.mq_password)
def __init__(self, ip, port, user='rabbitmq', password='password'):
c = kombu.Connection("amqp://{0}:{1}@{2}:{3}//".format(user, password,
ip, port))
c.connect()
self.ch = c.channel()
def __init__(self, connection='amqp:///', name=None, logger=None, limit=None):
"""Initialization of Client instance
:param connection: connection for broker
:type connection: str, None, kombu.connections.Connection, dict
"""
self.connection = self._get_connection(connection)
self.exchanges = {}
if name is None:
try:
name = '<client: {}>'.format(self.connection.as_uri())
except: # pragma: no cover
# Errors with filesystem transport
name = '<client: {}>'.format(self.connection.transport_cls)
if logger is None:
logger = get_logger(__name__)
self.logger = InstanceLogger(self, logger)
self.name = name
self.logger.debug('%s built', self.name)
if limit is None:
# Set limit as global kombu limit.
limit = pools.get_limit()
self.limit = limit
self.connections = pools.Connections(self.limit)
def __init__(self, connection='amqp:///', logger=None, timeout=10, name=None):
"""Initialization
:param connection: connection for queues broker
:type connection: str, None, dict or Connection
:param logger: logging instance
:type logger: Logger
:param timeout: sleeping for loop, default = 0.1
:type timeout: None, int or float
"""
if logger is None:
logger = _logger
self.logger = InstanceLogger(self, logger)
self.connection = self._get_connection(connection)
self.timeout = timeout
self.consumers = []
if name is None:
try:
name = '<microservice: {}>'.format(self.connection.as_uri())
except: # pragma no cover
name = '<microservice: {}>'.format(
self.connection.transport_cls) # pragma: no cover
self.name = name
self._stop = False
self._stopped = False
def drain_events(self, infinity=True):
with nested(*self.consumers):
while not self._stop:
try:
self.connection.drain_events(timeout=self.timeout)
except socket.timeout:
if not infinity:
break
except Exception as e:
if not self.connection.connected and not self._stop:
self.logger.error(
'Connection to mq has broken off. Try to reconnect')
self.connect()
self.revive()
break
elif not self._stop and not isinstance(e, HandlerError):
self.logger.exception(
'Something wrong! Try to restart the loop')
self.revive()
break
elif isinstance(e, HandlerError):
pass
else: # pragma: no cover
self.logger.exception('Unknown error') # pragma: no cover
if self._stop:
self._stopped = True
self.logger.info('Stopped draining events.')
def _connection(self):
return kombu.Connection(self.url)
def onconnection_revived(self): # pragma: no cover
"""
Called when a reconnection occurs.
"""
self.logger.info('Connection (re)established')
def on_consume_end(self, connection, channel): # pragma: no cover
"""
Called when the service stops consuming.
:param connection: The current connection instance.
:type connection: kombu.Connection
:param channel: The current channel.
:type channel: kombu.transport.*.Channel
"""
self.logger.warn('Consuming has ended')
def __init__(self, aws_connection, transport='amqp'):
self.ec2_conn = aws_connection
self.broker_uri = \
"{transport}://{username}:{password}@{rabbit_host}:{rabbit_port}"\
.format(transport=transport,
username=CONF.rabbit_userid,
password=CONF.rabbit_password,
rabbit_host=CONF.rabbit_host,
rabbit_port=CONF.rabbit_port)
self.connection = Connection(self.broker_uri)
def sync_get(name, interval=0.5):
with Connection(BROKER_URI) as conn:
publish(conn, name)
while 1:
rs = r.get(name)
if rs and Backend.from_json(cPickle.loads(rs)).status == SUCCESS:
break
time.sleep(interval)
item = Backend.get(name)
return item.result
def main():
disconnect()
connect('zhihulive')
with Connection(BROKER_URI) as conn:
consumer(conn, [process_task])
def action_proc_remove(config):
log.warning(" - Trying to connect with server...")
url = '%s://%s' % (config.broker_type, config.target)
with Connection(url) as conn:
in_queue = conn.SimpleQueue('celery')
# Get remote process
for _ in get_remote_messages(config, in_queue, False):
pass
log.error(" - All tasks removed from '%s'" % config.target)
def _make_publisher(self):
bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
bus_connection = Connection(bus_url)
bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'])
bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
bus_marshaler = Marshaler(self._uuid)
return Publisher(bus_producer, bus_marshaler)
def run(self):
logger.info("Running AMQP consumer")
with Connection(self._bus_url) as connection:
self.connection = connection
super(CoreBusConsumer, self).run()
def _make_publisher(self):
bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
bus_connection = Connection(bus_url)
same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False}
bus_exchange = Exchange(self.config['exchange_name'],
type=self.config['exchange_type'],
**same_exchange_arguments_as_collectd)
bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
bus_marshaler = CollectdMarshaler(self._uuid)
return Publisher(bus_producer, bus_marshaler)
def listen_events(self, routing_key, exchange=BUS_EXCHANGE_XIVO):
with Connection(self._url) as conn:
queue = Queue(BUS_QUEUE_NAME, exchange=exchange, routing_key=routing_key, channel=conn.channel())
queue.declare()
queue.purge()
self.bus_queue = queue
def _drain_events(self, on_event):
if not hasattr(self, 'bus_queue'):
raise Exception('You must listen for events before consuming them')
with Connection(self._url) as conn:
with Consumer(conn, self.bus_queue, callbacks=[on_event]):
try:
while True:
conn.drain_events(timeout=0.5)
except TimeoutError:
pass