def test_stream_reconnection_disconnection(stream):
async def dummy(*args, **kwargs):
pass
turn = -1
with patch.object(stream, '_connect', side_effect=response_disconnection):
with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
async for data in stream:
assert stream._state == DISCONNECTION
turn += 1
if turn == 0:
assert data == {'connected': True}
elif turn % 2 == 1:
timeout = DISCONNECTION_TIMEOUT * (turn + 1) / 2
if timeout > MAX_DISCONNECTION_TIMEOUT:
actual = data['reconnecting_in']
assert actual == MAX_DISCONNECTION_TIMEOUT
break
assert data == {'reconnecting_in': timeout, 'error': None}
else:
assert data == {'stream_restart': True}
评论列表
文章目录