futures.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:squishy 作者: tmehlinger 项目源码 文件源码
def process_messages(self, messages):
        future_to_message = {}
        processed = []

        self.logger.debug('processing %d messages', len(messages))
        for message in messages:
            # ThreadPoolExecutor/ProcessPoolExecutor will throw a
            # RuntimeException if we try to submit while it's shutting down.
            # If we encounter a RuntimeError, immediately stop trying to
            # submit new tasks; they will get requeued after the interval
            # configured on the queue's policy.
            try:
                future = self.pool.submit(self.func, message)
            except RuntimeError:
                self.logger.exception('cannot submit jobs to pool')
                raise
            else:
                future_to_message[future] = message

        for future in futures.as_completed(future_to_message,
                                           timeout=self.timeout):
            message = future_to_message.pop(future)
            try:
                future.result()
            except:
                self.logger.exception('exception processing message %s',
                                      message.message_id)
            else:
                processed.append(message)

        return processed
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号