def setUp(self):
super(TestProcessController, self).setUp()
try:
self._connection = pika.BlockingConnection()
except pika.exceptions.ConnectionClosed:
self.fail("Couldn't open connection. Make sure rmq server is running")
self.exchange = '{}.{}.task_control'.format(
self.__class__, uuid.uuid4())
self.channel = self._connection.channel()
self.channel.exchange_declare(exchange=self.exchange, type='fanout')
self.manager = ProcessManager()
self.controller = ProcessController(
self._connection, exchange=self.exchange,
process_manager=self.manager)
评论列表
文章目录