test_basic.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:socketshark 作者: closeio 项目源码 文件源码
def test_order_filter_invalid(self):
        """
        Test invalid message order.
        """
        shark = SocketShark(TEST_CONFIG)
        await shark.prepare()
        client = MockClient(shark)
        session = client.session

        subscription = 'simple.topic'

        await session.on_client_event({
            'event': 'subscribe',
            'subscription': subscription,
        })
        assert client.log.pop() == {
            'event': 'subscribe',
            'subscription': subscription,
            'status': 'ok',
        }

        # Test message from server to client
        redis_settings = TEST_CONFIG['REDIS']
        redis = await aioredis.create_redis((
            redis_settings['host'], redis_settings['port']))
        redis_topic = redis_settings['channel_prefix'] + subscription

        await redis.publish_json(redis_topic, {
            'subscription': subscription,
            '_order': 'invalid',
            'data': {'foo': 'invalid'},
        })

        await redis.publish_json(redis_topic, {
            'subscription': subscription,
            'data': {'foo': 'bar'},
        })

        redis.close()

        # Wait for Redis to propagate the messages
        await asyncio.sleep(0.1)

        has_messages = await shark.run_service_receiver(once=True)
        assert has_messages

        assert client.log == [{
            'event': 'message',
            'subscription': subscription,
            'data': {'foo': 'bar'},
        }]

        await shark.shutdown()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号