def test_partial_header_read(self, *args):
c = self.make_connection()
header = self.make_header_prefix(SupportedMessage)
options = self.make_options_body()
message = self.make_msg(header, options)
# read in the first byte
c._socket.recv.return_value = message[0:1]
c.handle_read(None, 0)
self.assertEqual(c._iobuf.getvalue(), message[0:1])
c._socket.recv.return_value = message[1:]
c.handle_read(None, 0)
self.assertEqual(six.binary_type(), c._iobuf.getvalue())
# let it write out a StartupMessage
c.handle_write(None, 0)
header = self.make_header_prefix(ReadyMessage, stream_id=1)
c._socket.recv.return_value = self.make_msg(header)
c.handle_read(None, 0)
self.assertTrue(c.connected_event.is_set())
self.assertFalse(c.is_defunct)
test_libevreactor.py 文件源码
python
阅读 50
收藏 0
点赞 0
评论 0
评论列表
文章目录