test_asyncorereactor.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码
def test_partial_send(self, *args):
        c = self.make_connection()

        # only write the first four bytes of the OptionsMessage
        write_size = 4
        c.socket.send.side_effect = None
        c.socket.send.return_value = write_size
        c.handle_write()

        msg_size = 9  # v3+ frame header
        expected_writes = int(math.ceil(float(msg_size) / write_size))
        size_mod = msg_size % write_size
        last_write_size = size_mod if size_mod else write_size
        self.assertFalse(c.is_defunct)
        self.assertEqual(expected_writes, c.socket.send.call_count)
        self.assertEqual(last_write_size, len(c.socket.send.call_args[0][0]))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号