def test_pass_fds(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
open_fds = set()
for x in range(5):
fds = os.pipe()
self.addCleanup(os.close, fds[0])
self.addCleanup(os.close, fds[1])
os.set_inheritable(fds[0], True)
os.set_inheritable(fds[1], True)
open_fds.update(fds)
for fd in open_fds:
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True,
pass_fds=(fd, ))
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
to_be_closed = open_fds - {fd}
self.assertIn(fd, remaining_fds, "fd to be passed not passed")
self.assertFalse(remaining_fds & to_be_closed,
"fd to be closed passed")
# pass_fds overrides close_fds with a warning.
with self.assertWarns(RuntimeWarning) as context:
self.assertFalse(subprocess.call(
[sys.executable, "-c", "import sys; sys.exit(0)"],
close_fds=False, pass_fds=(fd, )))
self.assertIn('overriding close_fds', str(context.warning))
评论列表
文章目录