def _close_fds(self, keep):
# `keep` is a set of fds, so we
# use os.closerange from 3 to min(keep)
# and then from max(keep + 1) to MAXFD and
# loop through filling in the gaps.
# Under new python versions, we need to explicitly set
# passed fds to be inheritable or they will go away on exec
if hasattr(os, 'set_inheritable'):
set_inheritable = os.set_inheritable
else:
set_inheritable = lambda i, v: True
if hasattr(os, 'closerange'):
keep = sorted(keep)
min_keep = min(keep)
max_keep = max(keep)
os.closerange(3, min_keep)
os.closerange(max_keep + 1, MAXFD)
for i in xrange(min_keep, max_keep):
if i in keep:
set_inheritable(i, True)
continue
try:
os.close(i)
except:
pass
else:
for i in xrange(3, MAXFD):
if i in keep:
set_inheritable(i, True)
continue
try:
os.close(i)
except:
pass
python类set_inheritable()的实例源码
def set_inheritable(self, inheritable):
os.set_handle_inheritable(self.fileno(), inheritable)
def set_inheritable(self, inheritable):
os.set_inheritable(self.fileno(), inheritable)
def test_close_fds(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
fds = os.pipe()
self.addCleanup(os.close, fds[0])
self.addCleanup(os.close, fds[1])
open_fds = set(fds)
# add a bunch more fds
for _ in range(9):
fd = os.open(os.devnull, os.O_RDONLY)
self.addCleanup(os.close, fd)
open_fds.add(fd)
for fd in open_fds:
os.set_inheritable(fd, True)
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=False)
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertEqual(remaining_fds & open_fds, open_fds,
"Some fds were closed")
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True)
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertFalse(remaining_fds & open_fds,
"Some fds were left open")
self.assertIn(1, remaining_fds, "Subprocess failed")
# Keep some of the fd's we opened open in the subprocess.
# This tests _posixsubprocess.c's proper handling of fds_to_keep.
fds_to_keep = set(open_fds.pop() for _ in range(8))
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True,
pass_fds=())
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertFalse(remaining_fds & fds_to_keep & open_fds,
"Some fds not in pass_fds were left open")
self.assertIn(1, remaining_fds, "Subprocess failed")
def test_close_fds(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
fds = os.pipe()
self.addCleanup(os.close, fds[0])
self.addCleanup(os.close, fds[1])
open_fds = set(fds)
# add a bunch more fds
for _ in range(9):
fd = os.open("/dev/null", os.O_RDONLY)
self.addCleanup(os.close, fd)
open_fds.add(fd)
for fd in open_fds:
os.set_inheritable(fd, True)
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=False)
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertEqual(remaining_fds & open_fds, open_fds,
"Some fds were closed")
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True)
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertFalse(remaining_fds & open_fds,
"Some fds were left open")
self.assertIn(1, remaining_fds, "Subprocess failed")
# Keep some of the fd's we opened open in the subprocess.
# This tests _posixsubprocess.c's proper handling of fds_to_keep.
fds_to_keep = set(open_fds.pop() for _ in range(8))
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True,
pass_fds=())
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertFalse(remaining_fds & fds_to_keep & open_fds,
"Some fds not in pass_fds were left open")
self.assertIn(1, remaining_fds, "Subprocess failed")
def fork_exec(cmd,
exec_env=None,
logfile=None,
pass_fds=None):
"""Execute a command using fork/exec.
This is needed for programs system executions that need path
searching but cannot have a shell as their parent process, for
example: glare. When glare starts it sets itself as
the parent process for its own process group. Thus the pid that
a Popen process would have is not the right pid to use for killing
the process group. This patch gives the test env direct access
to the actual pid.
:param cmd: Command to execute as an array of arguments.
:param exec_env: A dictionary representing the environment with
which to run the command.
:param logfile: A path to a file which will hold the stdout/err of
the child process.
:param pass_fds: Sequence of file descriptors passed to the child.
"""
env = os.environ.copy()
if exec_env is not None:
for env_name, env_val in exec_env.items():
if callable(env_val):
env[env_name] = env_val(env.get(env_name))
else:
env[env_name] = env_val
pid = os.fork()
if pid == 0:
if logfile:
fds = [1, 2]
with open(logfile, 'r+b') as fptr:
for desc in fds: # close fds
try:
os.dup2(fptr.fileno(), desc)
except OSError:
pass
if pass_fds and hasattr(os, 'set_inheritable'):
# os.set_inheritable() is only available and needed
# since Python 3.4. On Python 3.3 and older, file descriptors are
# inheritable by default.
for fd in pass_fds:
os.set_inheritable(fd, True)
args = shlex.split(cmd)
os.execvpe(args[0], args, env)
else:
return pid