test_rmq.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:plumpy 作者: aiidateam 项目源码 文件源码
def setUp(self):
        super(TestProcessController, self).setUp()
        try:
            self._connection = pika.BlockingConnection()
        except pika.exceptions.ConnectionClosed:
            self.fail("Couldn't open connection.  Make sure rmq server is running")

        self.exchange = '{}.{}.task_control'.format(
            self.__class__, uuid.uuid4())

        self.channel = self._connection.channel()
        self.channel.exchange_declare(exchange=self.exchange, type='fanout')

        self.manager = ProcessManager()
        self.controller = ProcessController(
            self._connection, exchange=self.exchange,
            process_manager=self.manager)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号