def test_wait_non_children(self):
# test wait() against processes which are not our children
code = "import sys;"
code += "from subprocess import Popen, PIPE;"
code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
code += "sp = Popen(cmd, stdout=PIPE);"
code += "sys.stdout.write(str(sp.pid));"
sproc = get_test_subprocess([PYTHON, "-c", code],
stdout=subprocess.PIPE)
grandson_pid = int(sproc.stdout.read())
grandson_proc = psutil.Process(grandson_pid)
try:
self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
grandson_proc.kill()
ret = grandson_proc.wait()
self.assertEqual(ret, None)
finally:
reap_children(recursive=True)
python类Popen()的实例源码
def test_children_recursive(self):
# here we create a subprocess which creates another one as in:
# A (parent) -> B (child) -> C (grandchild)
s = "import subprocess, os, sys, time;"
s += "PYTHON = os.path.realpath(sys.executable);"
s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
s += "subprocess.Popen(cmd);"
s += "time.sleep(60);"
get_test_subprocess(cmd=[PYTHON, "-c", s])
p = psutil.Process()
self.assertEqual(len(p.children(recursive=False)), 1)
# give the grandchild some time to start
stop_at = time.time() + GLOBAL_TIMEOUT
while time.time() < stop_at:
children = p.children(recursive=True)
if len(children) > 1:
break
self.assertEqual(len(children), 2)
self.assertEqual(children[0].ppid(), os.getpid())
self.assertEqual(children[1].ppid(), children[0].pid)
def test_Popen(self):
# XXX this test causes a ResourceWarning on Python 3 because
# psutil.__subproc instance doesn't get propertly freed.
# Not sure what to do though.
cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
proc.name()
proc.cpu_times()
proc.stdin
self.assertTrue(dir(proc))
self.assertRaises(AttributeError, getattr, proc, 'foo')
finally:
proc.kill()
proc.wait()
def execute(cmd, stderr_to_stdout=False, stdin=None):
"""Execute a command in the shell and return a tuple (rc, stdout, stderr)"""
if stderr_to_stdout:
stderr = STDOUT
else:
stderr = PIPE
if stdin is None:
_stdin = None
else:
_stdin = PIPE
p = Popen(cmd, close_fds=True, stdin=_stdin, stdout=PIPE, stderr=stderr, preexec_fn=os.setsid)
stdout, stderr = p.communicate(input=stdin)
return p.returncode, stdout, stderr
def _fun_thread_daemon_watch(self):
time_sleep = 5
if self._close:
return
while True:
'''check api work'''
is_api_ok = self._is_api_socket_ok()
if is_api_ok is True:
time.sleep(time_sleep)
continue
self._loop_kill_futunn()
'''start new ftnn.exe process'''
process_new = psutil.Popen([self._exe_path, "type=python_auto"])
if process_new is not None:
print("FTApiDaemon new futnn process open ! pid={}".format(process_new.pid))
else:
print("FTApiDaemon open process fail ! ")
time.sleep(time_sleep)
def test_wait_non_children(self):
# test wait() against processes which are not our children
code = "import sys;"
code += "from subprocess import Popen, PIPE;"
code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
code += "sp = Popen(cmd, stdout=PIPE);"
code += "sys.stdout.write(str(sp.pid));"
sproc = get_test_subprocess([PYTHON, "-c", code],
stdout=subprocess.PIPE)
grandson_pid = int(sproc.stdout.read())
grandson_proc = psutil.Process(grandson_pid)
try:
self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
grandson_proc.kill()
ret = grandson_proc.wait()
self.assertEqual(ret, None)
finally:
reap_children(recursive=True)
def test_children_recursive(self):
# here we create a subprocess which creates another one as in:
# A (parent) -> B (child) -> C (grandchild)
s = "import subprocess, os, sys, time;"
s += "PYTHON = os.path.realpath(sys.executable);"
s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
s += "subprocess.Popen(cmd);"
s += "time.sleep(60);"
get_test_subprocess(cmd=[PYTHON, "-c", s])
p = psutil.Process()
self.assertEqual(len(p.children(recursive=False)), 1)
# give the grandchild some time to start
stop_at = time.time() + GLOBAL_TIMEOUT
while time.time() < stop_at:
children = p.children(recursive=True)
if len(children) > 1:
break
self.assertEqual(len(children), 2)
self.assertEqual(children[0].ppid(), os.getpid())
self.assertEqual(children[1].ppid(), children[0].pid)
def test_Popen(self):
# XXX this test causes a ResourceWarning on Python 3 because
# psutil.__subproc instance doesn't get propertly freed.
# Not sure what to do though.
cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
proc.name()
proc.cpu_times()
proc.stdin
self.assertTrue(dir(proc))
self.assertRaises(AttributeError, getattr, proc, 'foo')
finally:
proc.kill()
proc.wait()
def test_wait_non_children(self):
# test wait() against processes which are not our children
code = "import sys;"
code += "from subprocess import Popen, PIPE;"
code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
code += "sp = Popen(cmd, stdout=PIPE);"
code += "sys.stdout.write(str(sp.pid));"
sproc = get_test_subprocess([PYTHON, "-c", code],
stdout=subprocess.PIPE)
grandson_pid = int(sproc.stdout.read())
grandson_proc = psutil.Process(grandson_pid)
try:
self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
grandson_proc.kill()
ret = grandson_proc.wait()
self.assertEqual(ret, None)
finally:
reap_children(recursive=True)
def test_children_recursive(self):
# here we create a subprocess which creates another one as in:
# A (parent) -> B (child) -> C (grandchild)
s = "import subprocess, os, sys, time;"
s += "PYTHON = os.path.realpath(sys.executable);"
s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
s += "subprocess.Popen(cmd);"
s += "time.sleep(60);"
get_test_subprocess(cmd=[PYTHON, "-c", s])
p = psutil.Process()
self.assertEqual(len(p.children(recursive=False)), 1)
# give the grandchild some time to start
stop_at = time.time() + GLOBAL_TIMEOUT
while time.time() < stop_at:
children = p.children(recursive=True)
if len(children) > 1:
break
self.assertEqual(len(children), 2)
self.assertEqual(children[0].ppid(), os.getpid())
self.assertEqual(children[1].ppid(), children[0].pid)
def test_Popen(self):
# XXX this test causes a ResourceWarning on Python 3 because
# psutil.__subproc instance doesn't get propertly freed.
# Not sure what to do though.
cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
proc.name()
proc.cpu_times()
proc.stdin
self.assertTrue(dir(proc))
self.assertRaises(AttributeError, getattr, proc, 'foo')
finally:
proc.kill()
proc.wait()
def start_hoplite_server(port_num):
proc = psutil.Popen(
'{} -c "import hoplite.main; hoplite.main.server_main([\'--port={}\'])"'.format(sys.executable, port_num)
)
wait_for_hoplite('localhost', port_num)
return proc
def start_hoplite_server(port_num):
proc = psutil.Popen(
'{} -c "import hoplite.main; hoplite.main.server_main([\'--port={}\'])"'.format(sys.executable, port_num),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
wait_for_hoplite('localhost', port_num)
return proc
def start_hoplite_server(port_num):
proc = psutil.Popen(
'{} -c "import hoplite.main; hoplite.main.server_main([\'--port={}\'])"'.format(sys.executable, port_num)
)
wait_for_hoplite('localhost', port_num)
return proc
def test_exe(self):
sproc = get_test_subprocess()
exe = psutil.Process(sproc.pid).exe()
try:
self.assertEqual(exe, PYTHON)
except AssertionError:
if WINDOWS and len(exe) == len(PYTHON):
# on Windows we don't care about case sensitivity
normcase = os.path.normcase
self.assertEqual(normcase(exe), normcase(PYTHON))
else:
# certain platforms such as BSD are more accurate returning:
# "/usr/local/bin/python2.7"
# ...instead of:
# "/usr/local/bin/python"
# We do not want to consider this difference in accuracy
# an error.
ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
try:
self.assertEqual(exe.replace(ver, ''),
PYTHON.replace(ver, ''))
except AssertionError:
# Tipically OSX. Really not sure what to do here.
pass
subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'],
stdout=subprocess.PIPE)
out, _ = subp.communicate()
self.assertEqual(out.strip(), b'hey')
def test_Popen_ctx_manager(self):
with psutil.Popen([PYTHON, "-V"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE) as proc:
pass
assert proc.stdout.closed
assert proc.stderr.closed
assert proc.stdin.closed
def _execute(cmd, stdin=None):
"""Run command and return output"""
logger.warn('Running command (panel): %s', cmd) # Warning level because we want to see this in logs
proc = Popen(cmd, bufsize=0, close_fds=True, stdout=PIPE, stderr=PIPE, stdin=PIPE)
stdout, stderr = proc.communicate(input=stdin)
return {
'returncode': proc.returncode,
'stdout': stdout,
'stderr': stderr,
}
# noinspection PyUnusedLocal
def start(self, parent):
self._set_node_uuid()
super(FastDaemon, self).start(parent)
self.vm_status_queue = Queue()
self.vm_status_watcher = Popen(self.SYSEVENT, bufsize=0, close_fds=True, stdout=PIPE, stderr=STDOUT,
preexec_fn=os.setsid)
self.vm_status_monitor_thread = Thread(target=self._vm_status_monitor, name='VMStatusMonitor',
args=(self.vm_status_watcher.stdout,))
self.vm_status_monitor_thread.daemon = True
self.vm_status_monitor_thread.start()
self.vm_status_dispatcher_thread = Thread(target=self._vm_status_dispatcher, name='VMStatusDispatcher')
self.vm_status_dispatcher_thread.daemon = True
self.vm_status_dispatcher_thread.start()
def _fun_thread_daemon_restart(self):
if self._close:
return
while True:
if self._is_api_socket_ok() is False:
process_new = psutil.Popen([self._exe_path, "type=python_auto"])
if process_new is not None:
print("FTApiDaemon new futnn process open ! pid={}".format(process_new.pid))
else:
print("FTApiDaemon open process fail ! ")
time.sleep(self._time_restart)
self._loop_kill_futunn()
def RunCommandEx(self, cwd, cmd, cmdargs):
try:
cmdline = "{0} {1}".format(cmd, cmdargs)
p = psutil.Popen(cmdline, close_fds=True, cwd=cwd, creationflags=subprocess.CREATE_NEW_CONSOLE)
return p
except Exception:
return None
def RunCommandAsEx(self, username, password, cwd, cmd, cmdargs):
try:
cmdline = "PsExec -u {0} -p {1} {2} {3}".format(username, password, cwd, cmd, cmdargs)
p = psutil.Popen(cmdline, close_fds=True, cwd=cwd, creationflags=subprocess.CREATE_NEW_CONSOLE)
return p
except Exception:
return None
def DumpThreadStacks(pid):
sddir = os.path.dirname(sys.argv[0])
p = psutil.Popen([os.path.join(sddir, "stackdump.exe"), str(pid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# wait for the process to terminate
out = p.communicate()[0]
return out
def test_exe(self):
sproc = get_test_subprocess()
exe = psutil.Process(sproc.pid).exe()
try:
self.assertEqual(exe, PYTHON)
except AssertionError:
if WINDOWS and len(exe) == len(PYTHON):
# on Windows we don't care about case sensitivity
normcase = os.path.normcase
self.assertEqual(normcase(exe), normcase(PYTHON))
else:
# certain platforms such as BSD are more accurate returning:
# "/usr/local/bin/python2.7"
# ...instead of:
# "/usr/local/bin/python"
# We do not want to consider this difference in accuracy
# an error.
ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
try:
self.assertEqual(exe.replace(ver, ''),
PYTHON.replace(ver, ''))
except AssertionError:
# Tipically OSX. Really not sure what to do here.
pass
subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'],
stdout=subprocess.PIPE)
out, _ = subp.communicate()
self.assertEqual(out.strip(), b'hey')
def test_Popen_ctx_manager(self):
with psutil.Popen([PYTHON, "-V"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE) as proc:
pass
assert proc.stdout.closed
assert proc.stderr.closed
assert proc.stdin.closed
def test_exe(self):
sproc = get_test_subprocess()
exe = psutil.Process(sproc.pid).exe()
try:
self.assertEqual(exe, PYTHON)
except AssertionError:
if WINDOWS and len(exe) == len(PYTHON):
# on Windows we don't care about case sensitivity
normcase = os.path.normcase
self.assertEqual(normcase(exe), normcase(PYTHON))
else:
# certain platforms such as BSD are more accurate returning:
# "/usr/local/bin/python2.7"
# ...instead of:
# "/usr/local/bin/python"
# We do not want to consider this difference in accuracy
# an error.
ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
try:
self.assertEqual(exe.replace(ver, ''),
PYTHON.replace(ver, ''))
except AssertionError:
# Tipically OSX. Really not sure what to do here.
pass
subp = subprocess.Popen([exe, '-c', 'import os; print("hey")'],
stdout=subprocess.PIPE)
out, _ = subp.communicate()
self.assertEqual(out.strip(), b'hey')
def test_Popen_ctx_manager(self):
with psutil.Popen([PYTHON, "-V"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE) as proc:
pass
assert proc.stdout.closed
assert proc.stderr.closed
assert proc.stdin.closed
def main():
process_size = 4
for episode in range(5):
ret_values = [None for _ in range(process_size)]
results = [None for _ in range(process_size)]
envs = [os.environ.copy() for _ in range(process_size)]
for i, env in enumerate(envs):
env[str('THEANO_FLAGS')] = str('device=cpu,floatX=float{}'.format(32 if i % 2 == 0 else 16))
time_before = time.time()
pool = [
psutil.Popen(
['python', 'slave.py'],
stdout=subprocess.PIPE,
env=envs[i],
)
for i in range(process_size)
]
# time_after = time.time()
# Roll polling
while any(e is None for e in ret_values):
for i, process in enumerate(pool):
ret_values[i] = process.poll()
time.sleep(1.0)
time_after = time.time()
for i, process in enumerate(pool):
if ret_values[i] == 0:
results[i], _ = process.communicate()
print('Time: {:.6}s'.format(time_after - time_before))
print(*results, sep='')
def run_command(args, **kwargs):
kwargs = merge_dicts({ 'stdout': subprocess.PIPE }, kwargs)
command = Popen(args, **kwargs)
return command
def run_script(script, *args):
s = os.path.join(os.path.dirname(__file__), '../../scripts/' + script)
command = [executable, s]
command.extend(args)
with Popen([executable, s]) as p:
sleep(4)
p.send_signal(SIGINT)
p.wait(timeout=1)
assert p.poll() == 0, 'script failed'