tests_dispatcher.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:Brightside 作者: BrighterCommand 项目源码 文件源码
def test_stop_performer(self):
        """
        Given that I have started a performer
        When I stop the performer
        Then it should terminate the pump
        :return:
        """
        request = MyCommand()
        pipeline = Queue()
        connection = Connection(config.broker_uri, "examples.perfomer.exchange")
        configuration = BrightsideConsumerConfiguration(pipeline, "performer.test.queue", "examples.tests.mycommand")
        performer = Performer("test_channel", connection, configuration, mock_consumer_factory, mock_command_processor_factory, map_my_command_to_request)

        header = BrightsideMessageHeader(uuid4(), request.__class__.__name__, BrightsideMessageType.MT_COMMAND)
        body = BrightsideMessageBody(JsonRequestSerializer(request=request).serialize_to_json(),
                                     BrightsideMessageBodyType.application_json)
        message = BrightsideMessage(header, body)

        pipeline.put(message)

        started_event = Event()
        p = performer.run(started_event)

        started_event.wait()

        time.sleep(1)

        performer.stop()

        p.join()

        self.assertTrue(True)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号