tests_channel.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:Brightside 作者: BrighterCommand 项目源码 文件源码
def test_handle_receive_on_a_channel(self):
        """
        Given that I have a channel
        When I receive on that channel
        Then I should get a message via the consumer
        """

        body = BrightsideMessageBody("test message")
        header = BrightsideMessageHeader(uuid4(), "test topic", BrightsideMessageType.MT_COMMAND)
        message = BrightsideMessage(header, body)

        fake_queue = [message]
        consumer = FakeConsumer(fake_queue)

        channel = Channel("test", consumer, Pipeline())

        msg = channel.receive(1)

        self.assertEqual(message.body.value, msg.body.value)
        self.assertEqual(message.header.topic, msg.header.topic)
        self.assertEqual(message.header.message_type, msg.header.message_type)
        self.assertEqual(0, len(fake_queue))  # We have read the queue
        self.assertTrue(channel.state == ChannelState.started)  # We don't stop because we consume a message
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号