def run(self):
self._is_running = True
while self._is_running:
if self.consumer.is_connected():
producer = kombu.Producer(self.consumer._channel, on_return=self.consumer._handle_return)
try:
queued_request = self._out_queue.get(timeout=0.5)
if True:
# with kombu.producers[self.consumer.get_connection()].acquire(block=True) as producer:
# producer.on_return = print
try:
self._dispatch_request(queued_request, producer)
except Exception as e:
# except ConnectionResetError:
log.debug('Failed to dispatch request, re-enqueueing again, error was: {}'.format(
str(e)
))
self.enqueue(queued_request)
except Empty:
continue
else:
sleep(0.5)
log.debug('Waiting for consumer to be ready...')
python类Producer()的实例源码
def connect(self, exchange, channel): # pragma: no cover
"""
Readies the StorageNotify for publishing notification messages by
setting up a kombu.Producer.
:param exchange: The exchange for publishing notifications.
:type exchange: kombu.Exchange
:param channel: The channel to bind to.
:type channel: kombu.transport.base.StdChannel
"""
name = self.__class__.__name__
self.logger.debug('Connecting {}'.format(name))
self._queue = kombu.Queue(exchange=exchange, channel=channel)
self._queue.declare()
self._producer = kombu.Producer(channel, exchange)
def send(self, topic, message):
"""Publishes a pulse message to the proper exchange."""
if not message:
Log.error("Expecting a message")
message._prepare()
if not self.connection:
self.connect()
producer = Producer(
channel=self.connection,
exchange=Exchange(self.settings.exchange, type='topic'),
routing_key=topic
)
# The message is actually a simple envelope format with a payload and
# some metadata.
final_data = Data(
payload=message.data,
_meta=set_default({
'exchange': self.settings.exchange,
'routing_key': message.routing_key,
'serializer': self.settings.serializer,
'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))),
'count': self.count
}, message.metadata)
)
producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer)
self.count += 1
def send(self, message: BrightsideMessage):
# we want to expose our logger to the functions defined in inner scope, so put it in their outer scope
logger = self._logger
def _build_message_header(msg: BrightsideMessage) -> Dict:
return KombuMessageFactory(msg).create_message_header()
def _publish(sender: Producer) -> None:
logger.debug("Send message {body} to broker {amqpuri} with routing key {routing_key}"
.format(body=message, amqpuri=self._amqp_uri, routing_key=message.header.topic))
sender.publish(message.body.bytes,
headers=_build_message_header(message),
exchange=self._exchange,
content_type="text/plain",
routing_key=message.header.topic,
declare=[self._exchange])
def _error_callback(e, interval) -> None:
logger.debug('Publishing error: {e}. Will retry in {interval} seconds', e, interval)
self._logger.debug("Connect to broker {amqpuri}".format(amqpuri=self._amqp_uri))
with connections[self._cnx].acquire(block=True) as conn:
with Producer(conn) as producer:
ensure_kwargs = self.RETRY_OPTIONS.copy()
ensure_kwargs['errback'] = _error_callback
safe_publish = conn.ensure(producer, _publish, **ensure_kwargs)
safe_publish(producer)
def publish_message_to_queue(self, queue):
uid = utils.generate_uuid()
producer = kombu.Producer(channel=self.ch, routing_key=queue)
producer.publish(uid)
return {'queue': queue, 'id': uid}
def _make_publisher(self):
bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
bus_connection = Connection(bus_url)
bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'])
bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
bus_marshaler = Marshaler(self._uuid)
return Publisher(bus_producer, bus_marshaler)
def _make_publisher(self):
bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
bus_connection = Connection(bus_url)
same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False}
bus_exchange = Exchange(self.config['exchange_name'],
type=self.config['exchange_type'],
**same_exchange_arguments_as_collectd)
bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
bus_marshaler = CollectdMarshaler(self._uuid)
return Publisher(bus_producer, bus_marshaler)
def send_event(self, event, routing_key):
with Connection(self._url) as connection:
producer = Producer(connection, exchange=BUS_EXCHANGE_XIVO, auto_declare=True)
producer.publish(json.dumps(event), routing_key=routing_key, content_type='application/json')
def connect(self):
"""
'Connects' to the bus.
:returns: The same instance.
:rtype: commissaire_http.bus.Bus
"""
if self.connection is not None:
self.logger.warn('Bus already connected.')
return self
self.connection = Connection(self.connection_url)
self._channel = self.connection.channel()
self._exchange = Exchange(
self.exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Create queues
self._queues = []
for kwargs in self.qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug('Created queue %s', queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Bus connection finished')
return self
def __init__(self, connection):
self.connection = connection
retry_exchange = self._configure_retry_exchanges(self.connection)
dead_exchange = self._configure_dead_exchange(self.connection)
self._retry_producer = Producer(self.connection, exchange=retry_exchange)
self._dead_producer = Producer(self.connection, exchange=dead_exchange)
def Producer(self, *args, **kwargs):
kwargs['deadline_pulse_url'] = self.app.conf.get('deadline_pulse_url')
print("deadline_pulse_url %s" % kwargs['deadline_pulse_url'])
kwargs['deadline_mongo_url'] = self.mongo_url
return DeadlineProducer(*args, **kwargs)
def producer_pool(self):
if self._producer_pool is None:
self._producer_pool = kombu.pools.producers[
self.app.connection_for_write()]
self._producer_pool.limit = self.app.pool.limit
# TODO: submit this patch to celery:
self._producer_pool.Producer = self.Producer
return self._producer_pool
# tasks ---
# FIXME: look into global tasks, which get added to all apps automatically
def start_connection(self):
"""
reset the connection to rabbit mq
:return:
"""
logger = logging.getLogger(self.__class__.__name__)
logger.info("starting new rabbit mq connection")
# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
# setup producer to push to error and dlqs
self._producer = Producer(channel=self._conn.channel(),
exchange=self._orchestrator.get_exchange())
test_weibo_harvester.py 文件源码
项目:sfm-weibo-harvester
作者: gwu-libraries
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_search_timeline(self):
self.path = "/sfm-data/collection_set/test_collection/test_3"
harvest_msg = {
"id": "test:3",
"type": "weibo_timeline",
"path": self.path,
"credentials": {
"access_token": tests.WEIBO_ACCESS_TOKEN
},
"collection_set": {
"id": "test_collection_set"
},
"collection": {
"id": "test_collection"
},
"options": {
"web_resources": True,
"image_sizes": [
"Thumbnail",
"Medium",
"Large"
]
}
}
with self._create_connection() as connection:
bound_exchange = self.exchange(connection)
producer = Producer(connection, exchange=bound_exchange)
producer.publish(harvest_msg, routing_key="harvest.start.weibo.weibo_timeline")
# Now wait for status message.
status_msg = self._wait_for_message(self.result_queue, connection)
# Matching ids
self.assertEqual("test:3", status_msg["id"])
# Running
self.assertEqual(STATUS_RUNNING, status_msg["status"])
# Another running message
status_msg = self._wait_for_message(self.result_queue, connection)
self.assertEqual(STATUS_RUNNING, status_msg["status"])
# Now wait for result message.
result_msg = self._wait_for_message(self.result_queue, connection)
# Matching ids
self.assertEqual("test:3", result_msg["id"])
# Success
self.assertEqual(STATUS_SUCCESS, result_msg["status"])
# Some weibo posts
self.assertTrue(result_msg["stats"][date.today().isoformat()]["weibos"])
# Web harvest message.
web_harvest_msg = self._wait_for_message(self.web_harvest_queue, connection)
# Some seeds
self.assertTrue(len(web_harvest_msg["seeds"]))
# Warc created message.
warc_msg = self._wait_for_message(self.warc_created_queue, connection)
# check path exist
self.assertTrue(os.path.isfile(warc_msg["warc"]["path"]))
def __init__(
self, exchange_name, connection_url, qkwargs, config_file=None):
"""
Initializes a new Service instance.
:param exchange_name: Name of the topic exchange.
:type exchange_name: str
:param connection_url: Kombu connection url.
:type connection_url: str
:param qkwargs: One or more dicts keyword arguments for queue creation
:type qkwargs: list
:param config_file: Path to the configuration file location.
:type config_file: str or None
"""
name = self.__class__.__name__
self.logger = logging.getLogger(name)
self.logger.debug('Initializing {}'.format(name))
# If we are given no default, use the global one
# Read the configuration file
self._config_data = read_config_file(
config_file, self._default_config_file)
if connection_url is None and 'bus_uri' in self._config_data:
connection_url = self._config_data.get('bus_uri')
self.logger.debug(
'Using connection_url=%s from config file', connection_url)
if exchange_name is None and 'exchange_name' in self._config_data:
self.logger.debug(
'Using exchange_name=%s from config file', exchange_name)
exchange_name = self._config_data.get('bus_exchange')
self.connection = Connection(connection_url)
self._channel = self.connection.default_channel
self._exchange = Exchange(
exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Set up queues
self._queues = []
for kwargs in qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug(queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Initializing of {} finished'.format(name))
def setup_entities(self):
"""
declare all required entities
no advanced error handling yet (like error on declaration with altered properties etc)
"""
# return if already inited
if self._service_inited:
return
# setup exchange
self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
type='topic',
durable=True)
# setup durable queues
self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.WORK_QUEUE + ".#",
durable=True)
self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.RETRY_QUEUE + ".#",
durable=True)
self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
exchange=self._booking_exchange,
routing_key=constants.DEAD_LETTER_QUEUE + ".#",
durable=True)
# a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
# this is to handle retry loop which may cause between retry-queue and work-queue.
# todo: Need to implement an alternive as this has a copy overhead
# which can be significant when the error queue is large
self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
exchange=self._booking_exchange,
routing_key='buffer.#',
durable=True)
# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
# declare all the exchanges and queues needed (declare, not overwrite existing)
for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
self.dlq_queue, self.buffer_queue]:
entity.maybe_bind(self._conn)
entity.declare()
# setup producer to push to error and dlqs
self._producer = Producer(channel=self._conn.channel(),
exchange=self._booking_exchange)
self._service_inited = True