test_stream.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:peony-twitter 作者: odrling 项目源码 文件源码
def test_stream_reconnection_enhance_your_calm(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_calm):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == ENHANCE_YOUR_CALM
                turn += 1

                if turn >= 100:
                    break

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = ENHANCE_YOUR_CALM_TIMEOUT * 2**(turn // 2)
                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号