def test_retryable_error(self, m_select):
m_select.side_effect = iter([
([self.sck], [], []),
([self.sck], [], []),
([self.sck], [], []),
([self.sck], [], []),
])
errors = []
for no in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
err = socket.error()
err.errno = no
errors.append(err)
exp_msg = {MSG_KEY_TYPE: MSG_TYPE_STATUS,
MSG_KEY_STATUS: STATUS_RESYNC}
self.sck.recv.side_effect = iter(errors + [msgpack.dumps(exp_msg)])
for _ in errors:
msg_gen = self.reader.new_messages(timeout=1)
self.assertRaises(StopIteration, next, msg_gen)
msg_gen = self.reader.new_messages(timeout=1)
msg_type, msg = next(msg_gen)
self.assertEqual(msg_type, MSG_TYPE_STATUS)
self.assertEqual(msg, exp_msg)
评论列表
文章目录