def test_stream_reconnection_reconnect(stream):
async def dummy(*args, **kwargs):
pass
turn = -1
with patch.object(stream, '_connect', side_effect=response_reconnection):
with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
async for data in stream:
assert stream._state == RECONNECTION
turn += 1
if turn == 0:
assert data == {'connected': True}
elif turn % 2 == 1:
timeout = RECONNECTION_TIMEOUT * 2**(turn // 2)
if timeout > MAX_RECONNECTION_TIMEOUT:
actual = data['reconnecting_in']
assert actual == MAX_RECONNECTION_TIMEOUT
break
assert data == {'reconnecting_in': timeout, 'error': None}
else:
assert data == {'stream_restart': True}
评论列表
文章目录