def test_socket_error_on_read(self, *args):
c = self.make_connection()
# let it write the OptionsMessage
c.handle_write()
# read in a SupportedMessage response
c.socket.recv.side_effect = socket_error(errno.EIO, "busy socket")
c.handle_read()
# make sure it errored correctly
self.assertTrue(c.is_defunct)
self.assertIsInstance(c.last_error, socket_error)
self.assertTrue(c.connected_event.is_set())
test_asyncorereactor.py 文件源码
python
阅读 25
收藏 0
点赞 0
评论 0
评论列表
文章目录