def test_partial_header_read(self, *args):
c = self.make_connection()
header = self.make_header_prefix(SupportedMessage)
options = self.make_options_body()
message = self.make_msg(header, options)
c.socket.recv.return_value = message[0:1]
c.handle_read()
self.assertEqual(c._iobuf.getvalue(), message[0:1])
c.socket.recv.return_value = message[1:]
c.handle_read()
self.assertEqual(six.binary_type(), c._iobuf.getvalue())
# let it write out a StartupMessage
c.handle_write()
header = self.make_header_prefix(ReadyMessage, stream_id=1)
c.socket.recv.return_value = self.make_msg(header)
c.handle_read()
self.assertTrue(c.connected_event.is_set())
self.assertFalse(c.is_defunct)
test_asyncorereactor.py 文件源码
python
阅读 23
收藏 0
点赞 0
评论 0
评论列表
文章目录