def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
return stderr
python类Process()的实例源码
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
def _memory_watchdog(start_evt, finish_evt, period=10.0):
"""A function which periodically watches the process' memory consumption
and prints it out.
"""
# XXX: because of the GIL, and because the very long operations tested
# in most bigmem tests are uninterruptible, the loop below gets woken up
# much less often than expected.
# The polling code should be rewritten in raw C, without holding the GIL,
# and push results onto an anonymous pipe.
try:
page_size = os.sysconf('SC_PAGESIZE')
except (ValueError, AttributeError):
try:
page_size = os.sysconf('SC_PAGE_SIZE')
except (ValueError, AttributeError):
page_size = 4096
procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
try:
f = open(procfile, 'rb')
except IOError as e:
warnings.warn('/proc not available for stats: {}'.format(e),
RuntimeWarning)
sys.stderr.flush()
return
with f:
start_evt.set()
old_data = -1
while not finish_evt.wait(period):
f.seek(0)
statm = f.read().decode('ascii')
data = int(statm.split()[5])
if data != old_data:
old_data = data
print(" ... process data size: {data:.1f}G"
.format(data=data * page_size / (1024 ** 3)))
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
return stderr
def get_multiprocessing_process__dangling(self):
if not multiprocessing:
return None
# This copies the weakrefs without making any strong reference
return multiprocessing.process._dangling.copy()
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
return stderr
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
return stderr
def get_multiprocessing_process__dangling(self):
if not multiprocessing:
return None
# This copies the weakrefs without making any strong reference
return multiprocessing.process._dangling.copy()
def restore_multiprocessing_process__dangling(self, saved):
if not multiprocessing:
return
multiprocessing.process._dangling.clear()
multiprocessing.process._dangling.update(saved)
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
return stderr
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
return stderr
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
This will typically be run on the result of the communicate() method
of a subprocess.Popen object.
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
return stderr
def get_multiprocessing_process__dangling(self):
if not multiprocessing:
return None
# Unjoined process objects can survive after process exits
multiprocessing.process._cleanup()
# This copies the weakrefs without making any strong reference
return multiprocessing.process._dangling.copy()
def restore_multiprocessing_process__dangling(self, saved):
if not multiprocessing:
return
multiprocessing.process._dangling.clear()
multiprocessing.process._dangling.update(saved)
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)