def test_dealloc_warn(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
r = repr(sock)
with self.assertWarns(ResourceWarning) as cm:
sock = None
support.gc_collect()
self.assertIn(r, str(cm.warning.args[0]))
# An open socket file object gets dereferenced after the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
f = sock.makefile('rb')
r = repr(sock)
sock = None
support.gc_collect()
with self.assertWarns(ResourceWarning):
f = None
support.gc_collect()
python类makefile()的实例源码
def test_dealloc_warn(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
r = repr(sock)
with self.assertWarns(ResourceWarning) as cm:
sock = None
support.gc_collect()
self.assertIn(r, str(cm.warning.args[0]))
# An open socket file object gets dereferenced after the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
f = sock.makefile('rb')
r = repr(sock)
sock = None
support.gc_collect()
with self.assertWarns(ResourceWarning):
f = None
support.gc_collect()
def test_dealloc_warn(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
r = repr(sock)
with self.assertWarns(ResourceWarning) as cm:
sock = None
support.gc_collect()
self.assertIn(r, str(cm.warning.args[0]))
# An open socket file object gets dereferenced after the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
f = sock.makefile('rb')
r = repr(sock)
sock = None
support.gc_collect()
with self.assertWarns(ResourceWarning):
f = None
support.gc_collect()
def test_dealloc_warn(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
r = repr(sock)
with self.assertWarns(ResourceWarning) as cm:
sock = None
support.gc_collect()
self.assertIn(r, str(cm.warning.args[0]))
# An open socket file object gets dereferenced after the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
f = sock.makefile('rb')
r = repr(sock)
sock = None
support.gc_collect()
with self.assertWarns(ResourceWarning):
f = None
support.gc_collect()
def worker(sock):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
exit_code = 0
try:
worker_main(infile, outfile)
except SystemExit as exc:
exit_code = compute_real_exit_code(exc.code)
finally:
try:
outfile.flush()
except Exception:
pass
return exit_code
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def test_name_closed_socketio(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
fp = sock.makefile("rb")
fp.close()
self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
def setUp(self):
self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
threading.Event() for i in range(4)]
SocketConnectedTest.setUp(self)
self.read_file = self.cli_conn.makefile(
self.read_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.write_file = self.serv_conn.makefile(
self.write_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def testCloseAfterMakefile(self):
# The file returned by makefile should keep the socket open.
self.cli_conn.close()
# read until EOF
msg = self.read_file.read()
self.assertEqual(msg, self.read_msg)
def testMakefileClose(self):
# The file returned by makefile should keep the socket open...
self.cli_conn.close()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
self.assertRaises(socket.error, self.cli_conn.recv, 1024)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
def process_request(self,
request,
client_address):
"""
Start a new thread to process the request.
"""
if self._ngamsServer.serving_count >= self._ngamsServer.getCfg().getMaxSimReqs():
logger.error("Maximum number of serving threads reached, rejecting request")
wfile = request.makefile('wb')
wfile.write(b'HTTP/1.0 503 Service Unavailable\r\n\r\n')
return
SocketServer.ThreadingMixIn.process_request(self, request, client_address)
def test_name_closed_socketio(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
fp = sock.makefile("rb")
fp.close()
self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
def test_unusable_closed_socketio(self):
with socket.socket() as sock:
fp = sock.makefile("rb", buffering=0)
self.assertTrue(fp.readable())
self.assertFalse(fp.writable())
self.assertFalse(fp.seekable())
fp.close()
self.assertRaises(ValueError, fp.readable)
self.assertRaises(ValueError, fp.writable)
self.assertRaises(ValueError, fp.seekable)
def setUp(self):
self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
threading.Event() for i in range(4)]
SocketConnectedTest.setUp(self)
self.read_file = self.cli_conn.makefile(
self.read_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.write_file = self.serv_conn.makefile(
self.write_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def testUnbufferedReadline(self):
# Read a line, create a new file object, read another line with it
line = self.read_file.readline() # first line
self.assertEqual(line, b"A. " + self.write_msg) # first line
self.read_file = self.cli_conn.makefile('rb', 0)
line = self.read_file.readline() # second line
self.assertEqual(line, b"B. " + self.write_msg) # second line
def testMakefileClose(self):
# The file returned by makefile should keep the socket open...
self.cli_conn.close()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
self.assertRaises(socket.error, self.cli_conn.recv, 1024)
def _makefile(sock, mode):
return sock.makefile(mode)
def _makefile(sock, mode):
return sock.makefile(mode)
connections.py 文件源码
项目:ServerlessCrawler-VancouverRealState
作者: MarcelloLins
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _makefile(sock, mode):
return sock.makefile(mode)
def worker(sock):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
exit_code = 0
try:
worker_main(infile, outfile)
except SystemExit as exc:
exit_code = compute_real_exit_code(exc.code)
finally:
try:
outfile.flush()
except Exception:
pass
return exit_code
def test_name_closed_socketio(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
fp = sock.makefile("rb")
fp.close()
self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
def test_unusable_closed_socketio(self):
with socket.socket() as sock:
fp = sock.makefile("rb", buffering=0)
self.assertTrue(fp.readable())
self.assertFalse(fp.writable())
self.assertFalse(fp.seekable())
fp.close()
self.assertRaises(ValueError, fp.readable)
self.assertRaises(ValueError, fp.writable)
self.assertRaises(ValueError, fp.seekable)
def setUp(self):
self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
threading.Event() for i in range(4)]
SocketConnectedTest.setUp(self)
self.read_file = self.cli_conn.makefile(
self.read_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.write_file = self.serv_conn.makefile(
self.write_mode, self.bufsize,
encoding = self.encoding,
errors = self.errors,
newline = self.newline)