def tearDown(self):
for inst in popen2._active:
inst.wait()
popen2._cleanup()
self.assertFalse(popen2._active, "popen2._active not empty")
# The os.popen*() API delegates to the subprocess module (on Unix)
import subprocess
for inst in subprocess._active:
inst.wait()
subprocess._cleanup()
self.assertFalse(subprocess._active, "subprocess._active not empty")
reap_children()
python类_cleanup()的实例源码
def hack_subprocess():
"""subprocess functions may throw exceptions when used in multiple threads.
See http://bugs.python.org/issue1731717 for more information.
"""
global SUBPROCESS_CLEANUP_HACKED
if not SUBPROCESS_CLEANUP_HACKED and threading.activeCount() != 1:
# Only hack if there is ever multiple threads.
# There is no point to leak with only one thread.
subprocess._cleanup = lambda: None
SUBPROCESS_CLEANUP_HACKED = True
def hack_subprocess():
"""subprocess functions may throw exceptions when used in multiple threads.
See http://bugs.python.org/issue1731717 for more information.
"""
global SUBPROCESS_CLEANUP_HACKED
if not SUBPROCESS_CLEANUP_HACKED and threading.activeCount() != 1:
# Only hack if there is ever multiple threads.
# There is no point to leak with only one thread.
subprocess._cleanup = lambda: None
SUBPROCESS_CLEANUP_HACKED = True
def tearDown(self):
for inst in subprocess._active:
inst.wait()
subprocess._cleanup()
self.assertFalse(subprocess._active, "subprocess._active not empty")
def _assert_python(expected_success, *args, **env_vars):
if '__isolated' in env_vars:
isolated = env_vars.pop('__isolated')
else:
isolated = not env_vars
cmd_line = [sys.executable, '-X', 'faulthandler']
if isolated:
# isolated mode: ignore Python environment variables, ignore user
# site-packages, and don't add the current directory to sys.path
cmd_line.append('-I')
elif not env_vars:
# ignore Python environment variables
cmd_line.append('-E')
# Need to preserve the original environment, for in-place testing of
# shared library builds.
env = os.environ.copy()
# But a special flag that can be set to override -- in this case, the
# caller is responsible to pass the full environment.
if env_vars.pop('__cleanenv', None):
env = {}
env.update(env_vars)
cmd_line.extend(args)
p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
try:
out, err = p.communicate()
finally:
subprocess._cleanup()
p.stdout.close()
p.stderr.close()
rc = p.returncode
err = strip_python_stderr(err)
if (rc and expected_success) or (not rc and not expected_success):
raise AssertionError(
"Process return code is %d, "
"stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
return rc, out, err
def kill_python(p):
"""Run the given Popen process until completion and return stdout."""
p.stdin.close()
data = p.stdout.read()
p.stdout.close()
# try to cleanup the child so we don't appear to leak when running
# with regrtest -R.
p.wait()
subprocess._cleanup()
return data
def hack_subprocess():
"""subprocess functions may throw exceptions when used in multiple threads.
See http://bugs.python.org/issue1731717 for more information.
"""
global SUBPROCESS_CLEANUP_HACKED
if not SUBPROCESS_CLEANUP_HACKED and threading.activeCount() != 1:
# Only hack if there is ever multiple threads.
# There is no point to leak with only one thread.
subprocess._cleanup = lambda: None
SUBPROCESS_CLEANUP_HACKED = True
def _assert_python(expected_success, *args, **env_vars):
cmd_line = [sys.executable]
if not env_vars:
cmd_line.append('-E')
# Need to preserve the original environment, for in-place testing of
# shared library builds.
env = os.environ.copy()
# But a special flag that can be set to override -- in this case, the
# caller is responsible to pass the full environment.
if env_vars.pop('__cleanenv', None):
env = {}
env.update(env_vars)
cmd_line.extend(args)
p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
try:
out, err = p.communicate()
finally:
subprocess._cleanup()
p.stdout.close()
p.stderr.close()
rc = p.returncode
err = strip_python_stderr(err)
if (rc and expected_success) or (not rc and not expected_success):
raise AssertionError(
"Process return code is %d, "
"stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
return rc, out, err
def do_task(**post_data):
callback = post_data.get('callback_url')
acceptkey = post_data.get('accept_key')
task_id = post_data.get('task_id')
playbook = post_data.get('playbook')
extra_vars = post_data.get('extra_vars')
hosts = post_data.get('hosts')
p = Popen(
"/usr/bin/ansible-playbook -i %s %s --extra-vars='%s' -s" %
(hosts, playbook, extra_vars),
shell=True,
stdout=PIPE,
stderr=PIPE)
try:
stdout, stderr = p.communicate()
finally:
subprocess._cleanup()
p.stdout.close()
p.stderr.close()
rc = p.returncode
log_debug(
'task id %d in hosts %s playbook %s return stdout %s ,stderr %s!' %
(task_id, hosts, playbook, stdout, stderr))
return {
'task_id': task_id,
'callback_url': callback,
'accept_key': acceptkey,
'hosts': hosts,
'playbook': playbook,
'stdout': stdout,
'stderr': stderr,
'returncode': rc
}