def process_messages(self, messages):
future_to_message = {}
processed = []
self.logger.debug('processing %d messages', len(messages))
for message in messages:
# ThreadPoolExecutor/ProcessPoolExecutor will throw a
# RuntimeException if we try to submit while it's shutting down.
# If we encounter a RuntimeError, immediately stop trying to
# submit new tasks; they will get requeued after the interval
# configured on the queue's policy.
try:
future = self.pool.submit(self.func, message)
except RuntimeError:
self.logger.exception('cannot submit jobs to pool')
raise
else:
future_to_message[future] = message
for future in futures.as_completed(future_to_message,
timeout=self.timeout):
message = future_to_message.pop(future)
try:
future.result()
except:
self.logger.exception('exception processing message %s',
message.message_id)
else:
processed.append(message)
return processed
评论列表
文章目录