test_protocol.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:felix 作者: axbaretto 项目源码 文件源码
def test_retryable_error(self, m_select):
        m_select.side_effect = iter([
            ([self.sck], [], []),
            ([self.sck], [], []),
            ([self.sck], [], []),
            ([self.sck], [], []),
        ])
        errors = []
        for no in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
            err = socket.error()
            err.errno = no
            errors.append(err)
        exp_msg = {MSG_KEY_TYPE: MSG_TYPE_STATUS,
                   MSG_KEY_STATUS: STATUS_RESYNC}
        self.sck.recv.side_effect = iter(errors + [msgpack.dumps(exp_msg)])
        for _ in errors:
            msg_gen = self.reader.new_messages(timeout=1)
            self.assertRaises(StopIteration, next, msg_gen)
        msg_gen = self.reader.new_messages(timeout=1)
        msg_type, msg = next(msg_gen)
        self.assertEqual(msg_type, MSG_TYPE_STATUS)
        self.assertEqual(msg, exp_msg)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号