test_stream.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:peony-twitter 作者: odrling 项目源码 文件源码
def test_stream_reconnection_disconnection(stream):
    async def dummy(*args, **kwargs):
        pass

    turn = -1

    with patch.object(stream, '_connect', side_effect=response_disconnection):
        with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
            async for data in stream:
                assert stream._state == DISCONNECTION
                turn += 1

                if turn == 0:
                    assert data == {'connected': True}
                elif turn % 2 == 1:
                    timeout = DISCONNECTION_TIMEOUT * (turn + 1) / 2

                    if timeout > MAX_DISCONNECTION_TIMEOUT:
                        actual = data['reconnecting_in']
                        assert actual == MAX_DISCONNECTION_TIMEOUT
                        break

                    assert data == {'reconnecting_in': timeout, 'error': None}
                else:
                    assert data == {'stream_restart': True}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号