def test_stream_reconnection_enhance_your_calm(stream):
async def dummy(*args, **kwargs):
pass
turn = -1
with patch.object(stream, '_connect', side_effect=response_calm):
with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
async for data in stream:
assert stream._state == ENHANCE_YOUR_CALM
turn += 1
if turn >= 100:
break
if turn == 0:
assert data == {'connected': True}
elif turn % 2 == 1:
timeout = ENHANCE_YOUR_CALM_TIMEOUT * 2**(turn // 2)
assert data == {'reconnecting_in': timeout, 'error': None}
else:
assert data == {'stream_restart': True}
评论列表
文章目录