test_rmq.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:plumpy 作者: aiidateam 项目源码 文件源码
def setUp(self):
        super(TestTaskControllerAndRunner, self).setUp()

        try:
            connection = pika.BlockingConnection()
        except pika.exceptions.ConnectionClosed:
            self.fail("Couldn't open connection.  Make sure rmq server is running")

        queue = "{}.{}.tasks".format(self.__class__, uuid.uuid4())
        self.sender = TaskController(connection, queue=queue)
        self.runner = TaskRunner(connection, queue=queue)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号