def testUnbufferedReadline(self):
# Read a line, create a new file object, read another line with it
line = self.read_file.readline() # first line
self.assertEqual(line, b"A. " + self.write_msg) # first line
self.read_file = self.cli_conn.makefile('rb', 0)
line = self.read_file.readline() # second line
self.assertEqual(line, b"B. " + self.write_msg) # second line
python类makefile()的实例源码
def testMakefileClose(self):
# The file returned by makefile should keep the socket open...
self.cli_conn.close()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
self.assertRaises(OSError, self.cli_conn.recv, 1024)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def test_name_closed_socketio(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
fp = sock.makefile("rb")
fp.close()
self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
def test_unusable_closed_socketio(self):
with socket.socket() as sock:
fp = sock.makefile("rb", buffering=0)
self.assertTrue(fp.readable())
self.assertFalse(fp.writable())
self.assertFalse(fp.seekable())
fp.close()
self.assertRaises(ValueError, fp.readable)
self.assertRaises(ValueError, fp.writable)
self.assertRaises(ValueError, fp.seekable)
def setUp(self):
self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
threading.Event() for i in range(4)]
SocketConnectedTest.setUp(self)
self.read_file = self.cli_conn.makefile(
self.read_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.write_file = self.serv_conn.makefile(
self.write_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def testUnbufferedReadline(self):
# Read a line, create a new file object, read another line with it
line = self.read_file.readline() # first line
self.assertEqual(line, b"A. " + self.write_msg) # first line
self.read_file = self.cli_conn.makefile('rb', 0)
line = self.read_file.readline() # second line
self.assertEqual(line, b"B. " + self.write_msg) # second line
def testMakefileClose(self):
# The file returned by makefile should keep the socket open...
self.cli_conn.close()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
self.assertRaises(OSError, self.cli_conn.recv, 1024)
def __init__(self, conn):
self.conn = conn
self.conn._really_makefile = self.conn.makefile
self.conn.makefile = self
self.armed = False
self.file_reg = []
def unwrap(self):
self.conn.makefile = self.conn._really_makefile
del self.conn._really_makefile
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)