test_libevreactor.py 文件源码

python
阅读 50 收藏 0 点赞 0 评论 0

项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码
def test_partial_header_read(self, *args):
        c = self.make_connection()

        header = self.make_header_prefix(SupportedMessage)
        options = self.make_options_body()
        message = self.make_msg(header, options)

        # read in the first byte
        c._socket.recv.return_value = message[0:1]
        c.handle_read(None, 0)
        self.assertEqual(c._iobuf.getvalue(), message[0:1])

        c._socket.recv.return_value = message[1:]
        c.handle_read(None, 0)
        self.assertEqual(six.binary_type(), c._iobuf.getvalue())

        # let it write out a StartupMessage
        c.handle_write(None, 0)

        header = self.make_header_prefix(ReadyMessage, stream_id=1)
        c._socket.recv.return_value = self.make_msg(header)
        c.handle_read(None, 0)

        self.assertTrue(c.connected_event.is_set())
        self.assertFalse(c.is_defunct)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号