test_status.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:plumpy 作者: aiidateam 项目源码 文件源码
def setUp(self):
        super(TestStatusProvider, self).setUp()
        self._response = None
        self._corr_id = None

        try:
            self._connection = pika.BlockingConnection()
        except pika.exceptions.ConnectionClosed:
            self.fail("Couldn't open connection.  Make sure rmq server is running")

        self.channel = self._connection.channel()

        # Set up the request exchange
        self.request_exchange = '{}.{}.task_control'.format(
            self.__class__, uuid.uuid4())
        self.channel.exchange_declare(exchange=self.request_exchange, type='fanout')

        # Set up the response queue
        result = self.channel.queue_declare(exclusive=True)
        self.response_queue = result.method.queue
        self.channel.basic_consume(
            self._on_response, no_ack=True, queue=self.response_queue)

        self.manager = ProcessManager()
        self.provider = StatusProvider(
            self._connection, exchange=self.request_exchange,
            process_manager=self.manager)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号