def test_kill(self):
sproc = get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.kill()
sig = p.wait()
self.assertFalse(psutil.pid_exists(test_pid))
if POSIX:
self.assertEqual(sig, -signal.SIGKILL)
python类POSIX的实例源码
def test_terminate(self):
sproc = get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.terminate()
sig = p.wait()
self.assertFalse(psutil.pid_exists(test_pid))
if POSIX:
self.assertEqual(sig, -signal.SIGTERM)
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
exit_sig = p.wait()
self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(exit_sig, -sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
with self.assertRaises(psutil.AccessDenied):
psutil.Process().send_signal(sig)
# Sending a signal to process with PID 0 is not allowed as
# it would affect every process in the process group of
# the calling process (os.getpid()) instead of PID 0").
if 0 in psutil.pids():
p = psutil.Process(0)
self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def test_memory_maps(self):
p = psutil.Process()
maps = p.memory_maps()
paths = [x for x in maps]
self.assertEqual(len(paths), len(set(paths)))
ext_maps = p.memory_maps(grouped=False)
for nt in maps:
if not nt.path.startswith('['):
assert os.path.isabs(nt.path), nt.path
if POSIX:
try:
assert os.path.exists(nt.path) or \
os.path.islink(nt.path), nt.path
except AssertionError:
if not LINUX:
raise
else:
# https://github.com/giampaolo/psutil/issues/759
with open('/proc/self/smaps') as f:
data = f.read()
if "%s (deleted)" % nt.path not in data:
raise
else:
# XXX - On Windows we have this strange behavior with
# 64 bit dlls: they are visible via explorer but cannot
# be accessed via os.stat() (wtf?).
if '64' not in os.path.basename(nt.path):
assert os.path.exists(nt.path), nt.path
for nt in ext_maps:
for fname in nt._fields:
value = getattr(nt, fname)
if fname == 'path':
continue
elif fname in ('addr', 'perms'):
assert value, value
else:
self.assertIsInstance(value, (int, long))
assert value >= 0, value
def test_pid_0(self):
# Process(0) is supposed to work on all platforms except Linux
if 0 not in psutil.pids():
self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
return
# test all methods
p = psutil.Process(0)
for name in psutil._as_dict_attrnames:
if name == 'pid':
continue
meth = getattr(p, name)
try:
ret = meth()
except psutil.AccessDenied:
pass
else:
if name in ("uids", "gids"):
self.assertEqual(ret.real, 0)
elif name == "username":
if POSIX:
self.assertEqual(p.username(), 'root')
elif WINDOWS:
self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
elif name == "name":
assert name, name
if hasattr(p, 'rlimit'):
try:
p.rlimit(psutil.RLIMIT_FSIZE)
except psutil.AccessDenied:
pass
p.as_dict()
if not OPENBSD:
self.assertIn(0, psutil.pids())
self.assertTrue(psutil.pid_exists(0))
def setUp(self):
if POSIX:
import pwd
import grp
users = pwd.getpwall()
groups = grp.getgrall()
self.all_uids = set([x.pw_uid for x in users])
self.all_usernames = set([x.pw_name for x in users])
self.all_gids = set([x.gr_gid for x in groups])
def exe(self, ret, proc):
if not ret:
self.assertEqual(ret, '')
else:
assert os.path.isabs(ret), ret
# Note: os.stat() may return False even if the file is there
# hence we skip the test, see:
# http://stackoverflow.com/questions/3112546/os-path-exists-lies
if POSIX and os.path.isfile(ret):
if hasattr(os, 'access') and hasattr(os, "X_OK"):
# XXX may fail on OSX
self.assertTrue(os.access(ret, os.X_OK))
def memory_info(self, ret, proc):
for name in ret._fields:
self.assertGreaterEqual(getattr(ret, name), 0)
if POSIX and ret.vms != 0:
# VMS is always supposed to be the highest
for name in ret._fields:
if name != 'vms':
value = getattr(ret, name)
assert ret.vms > value, ret
elif WINDOWS:
assert ret.peak_wset >= ret.wset, ret
assert ret.peak_paged_pool >= ret.paged_pool, ret
assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
assert ret.peak_pagefile >= ret.pagefile, ret
def nice(self, ret, proc):
if POSIX:
assert -20 <= ret <= 20, ret
else:
priorities = [getattr(psutil, x) for x in dir(psutil)
if x.endswith('_PRIORITY_CLASS')]
self.assertIn(ret, priorities)
def create_exe(outpath, c_code=None):
"""Creates an executable file in the given location."""
assert not os.path.exists(outpath), outpath
if which("gcc"):
if c_code is None:
c_code = textwrap.dedent(
"""
#include <unistd.h>
int main() {
pause();
return 1;
}
""")
with tempfile.NamedTemporaryFile(
suffix='.c', delete=False, mode='wt') as f:
f.write(c_code)
try:
subprocess.check_call(["gcc", f.name, "-o", outpath])
finally:
safe_rmpath(f.name)
else:
# fallback - use python's executable
if c_code is not None:
raise ValueError(
"can't specify c_code arg as gcc is not installed")
shutil.copyfile(sys.executable, outpath)
if POSIX:
st = os.stat(outpath)
os.chmod(outpath, st.st_mode | stat.S_IEXEC)
# ===================================================================
# --- testing
# ===================================================================
def print_(a, b):
if sys.stdout.isatty() and psutil.POSIX:
fmt = '\x1b[1;32m%-13s\x1b[0m %s' % (a, b)
else:
fmt = '%-11s %s' % (a, b)
print(fmt)
def test_wait(self):
# check exit code signal
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.kill()
code = p.wait()
if POSIX:
self.assertEqual(code, signal.SIGKILL)
else:
self.assertEqual(code, 0)
self.assertFalse(p.is_running())
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.terminate()
code = p.wait()
if POSIX:
self.assertEqual(code, signal.SIGTERM)
else:
self.assertEqual(code, 0)
self.assertFalse(p.is_running())
# check sys.exit() code
code = "import time, sys; time.sleep(0.01); sys.exit(5);"
sproc = get_test_subprocess([PYTHON, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertFalse(p.is_running())
# Test wait() issued twice.
# It is not supposed to raise NSP when the process is gone.
# On UNIX this should return None, on Windows it should keep
# returning the exit code.
sproc = get_test_subprocess([PYTHON, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertIn(p.wait(), (5, None))
# test timeout
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.name()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
# timeout < 0 not allowed
self.assertRaises(ValueError, p.wait, -1)
# XXX why is this skipped on Windows?
def test_disk_partitions(self):
# all = False
ls = psutil.disk_partitions(all=False)
# on travis we get:
# self.assertEqual(p.cpu_affinity(), [n])
# AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
self.assertTrue(ls, msg=ls)
for disk in ls:
if WINDOWS and 'cdrom' in disk.opts:
continue
if not POSIX:
assert os.path.exists(disk.device), disk
else:
# we cannot make any assumption about this, see:
# http://goo.gl/p9c43
disk.device
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
assert disk.fstype, disk
self.assertIsInstance(disk.opts, str)
# all = True
ls = psutil.disk_partitions(all=True)
self.assertTrue(ls, msg=ls)
for disk in psutil.disk_partitions(all=True):
if not WINDOWS:
try:
os.stat(disk.mountpoint)
except OSError as err:
if TRAVIS and OSX and err.errno == errno.EIO:
continue
# http://mail.python.org/pipermail/python-dev/
# 2012-June/120787.html
if err.errno not in (errno.EPERM, errno.EACCES):
raise
else:
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
self.assertIsInstance(disk.fstype, str)
self.assertIsInstance(disk.opts, str)
def find_mount_point(path):
path = os.path.abspath(path)
while not os.path.ismount(path):
path = os.path.dirname(path)
return path.lower()
mount = find_mount_point(__file__)
mounts = [x.mountpoint.lower() for x in
psutil.disk_partitions(all=True)]
self.assertIn(mount, mounts)
psutil.disk_usage(mount)
def test_wait(self):
# check exit code signal
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.kill()
code = p.wait()
if POSIX:
self.assertEqual(code, -signal.SIGKILL)
else:
self.assertEqual(code, 0)
self.assertFalse(p.is_running())
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.terminate()
code = p.wait()
if POSIX:
self.assertEqual(code, -signal.SIGTERM)
else:
self.assertEqual(code, 0)
self.assertFalse(p.is_running())
# check sys.exit() code
code = "import time, sys; time.sleep(0.01); sys.exit(5);"
sproc = get_test_subprocess([PYTHON, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertFalse(p.is_running())
# Test wait() issued twice.
# It is not supposed to raise NSP when the process is gone.
# On UNIX this should return None, on Windows it should keep
# returning the exit code.
sproc = get_test_subprocess([PYTHON, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertIn(p.wait(), (5, None))
# test timeout
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.name()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
# timeout < 0 not allowed
self.assertRaises(ValueError, p.wait, -1)
# XXX why is this skipped on Windows?
def test_disk_partitions(self):
# all = False
ls = psutil.disk_partitions(all=False)
# on travis we get:
# self.assertEqual(p.cpu_affinity(), [n])
# AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
self.assertTrue(ls, msg=ls)
for disk in ls:
self.assertIsInstance(disk.device, (str, unicode))
self.assertIsInstance(disk.mountpoint, (str, unicode))
self.assertIsInstance(disk.fstype, (str, unicode))
self.assertIsInstance(disk.opts, (str, unicode))
if WINDOWS and 'cdrom' in disk.opts:
continue
if not POSIX:
assert os.path.exists(disk.device), disk
else:
# we cannot make any assumption about this, see:
# http://goo.gl/p9c43
disk.device
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
assert disk.fstype, disk
# all = True
ls = psutil.disk_partitions(all=True)
self.assertTrue(ls, msg=ls)
for disk in psutil.disk_partitions(all=True):
if not WINDOWS:
try:
os.stat(disk.mountpoint)
except OSError as err:
if TRAVIS and OSX and err.errno == errno.EIO:
continue
# http://mail.python.org/pipermail/python-dev/
# 2012-June/120787.html
if err.errno not in (errno.EPERM, errno.EACCES):
raise
else:
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
self.assertIsInstance(disk.fstype, str)
self.assertIsInstance(disk.opts, str)
def find_mount_point(path):
path = os.path.abspath(path)
while not os.path.ismount(path):
path = os.path.dirname(path)
return path.lower()
mount = find_mount_point(__file__)
mounts = [x.mountpoint.lower() for x in
psutil.disk_partitions(all=True)]
self.assertIn(mount, mounts)
psutil.disk_usage(mount)
def test_wait(self):
# check exit code signal
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.kill()
code = p.wait()
if POSIX:
self.assertEqual(code, -signal.SIGKILL)
else:
self.assertEqual(code, 0)
self.assertFalse(p.is_running())
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.terminate()
code = p.wait()
if POSIX:
self.assertEqual(code, -signal.SIGTERM)
else:
self.assertEqual(code, 0)
self.assertFalse(p.is_running())
# check sys.exit() code
code = "import time, sys; time.sleep(0.01); sys.exit(5);"
sproc = get_test_subprocess([PYTHON, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertFalse(p.is_running())
# Test wait() issued twice.
# It is not supposed to raise NSP when the process is gone.
# On UNIX this should return None, on Windows it should keep
# returning the exit code.
sproc = get_test_subprocess([PYTHON, "-c", code])
p = psutil.Process(sproc.pid)
self.assertEqual(p.wait(), 5)
self.assertIn(p.wait(), (5, None))
# test timeout
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.name()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
# timeout < 0 not allowed
self.assertRaises(ValueError, p.wait, -1)
# XXX why is this skipped on Windows?
def test_disk_partitions(self):
# all = False
ls = psutil.disk_partitions(all=False)
# on travis we get:
# self.assertEqual(p.cpu_affinity(), [n])
# AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
self.assertTrue(ls, msg=ls)
for disk in ls:
self.assertIsInstance(disk.device, (str, unicode))
self.assertIsInstance(disk.mountpoint, (str, unicode))
self.assertIsInstance(disk.fstype, (str, unicode))
self.assertIsInstance(disk.opts, (str, unicode))
if WINDOWS and 'cdrom' in disk.opts:
continue
if not POSIX:
assert os.path.exists(disk.device), disk
else:
# we cannot make any assumption about this, see:
# http://goo.gl/p9c43
disk.device
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
assert disk.fstype, disk
# all = True
ls = psutil.disk_partitions(all=True)
self.assertTrue(ls, msg=ls)
for disk in psutil.disk_partitions(all=True):
if not WINDOWS:
try:
os.stat(disk.mountpoint)
except OSError as err:
if TRAVIS and OSX and err.errno == errno.EIO:
continue
# http://mail.python.org/pipermail/python-dev/
# 2012-June/120787.html
if err.errno not in (errno.EPERM, errno.EACCES):
raise
else:
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
self.assertIsInstance(disk.fstype, str)
self.assertIsInstance(disk.opts, str)
def find_mount_point(path):
path = os.path.abspath(path)
while not os.path.ismount(path):
path = os.path.dirname(path)
return path.lower()
mount = find_mount_point(__file__)
mounts = [x.mountpoint.lower() for x in
psutil.disk_partitions(all=True)]
self.assertIn(mount, mounts)
psutil.disk_usage(mount)
def test_wait_procs(self):
def callback(p):
l.append(p.pid)
l = []
sproc1 = get_test_subprocess()
sproc2 = get_test_subprocess()
sproc3 = get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
t = time.time()
gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
self.assertLess(time.time() - t, 0.5)
self.assertEqual(gone, [])
self.assertEqual(len(alive), 3)
self.assertEqual(l, [])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 1)
self.assertEqual(len(alive), 2)
return gone, alive
sproc3.terminate()
gone, alive = test(procs, callback)
self.assertIn(sproc3.pid, [x.pid for x in gone])
if POSIX:
self.assertEqual(gone.pop().returncode, signal.SIGTERM)
else:
self.assertEqual(gone.pop().returncode, 1)
self.assertEqual(l, [sproc3.pid])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 3)
self.assertEqual(len(alive), 0)
return gone, alive
sproc1.terminate()
sproc2.terminate()
gone, alive = test(procs, callback)
self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid]))
for p in gone:
self.assertTrue(hasattr(p, 'returncode'))
def test_wait_procs(self):
def callback(p):
l.append(p.pid)
l = []
sproc1 = get_test_subprocess()
sproc2 = get_test_subprocess()
sproc3 = get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
t = time.time()
gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
self.assertLess(time.time() - t, 0.5)
self.assertEqual(gone, [])
self.assertEqual(len(alive), 3)
self.assertEqual(l, [])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 1)
self.assertEqual(len(alive), 2)
return gone, alive
sproc3.terminate()
gone, alive = test(procs, callback)
self.assertIn(sproc3.pid, [x.pid for x in gone])
if POSIX:
self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
else:
self.assertEqual(gone.pop().returncode, 1)
self.assertEqual(l, [sproc3.pid])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 3)
self.assertEqual(len(alive), 0)
return gone, alive
sproc1.terminate()
sproc2.terminate()
gone, alive = test(procs, callback)
self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid]))
for p in gone:
self.assertTrue(hasattr(p, 'returncode'))
def test_wait_procs(self):
def callback(p):
pids.append(p.pid)
pids = []
sproc1 = get_test_subprocess()
sproc2 = get_test_subprocess()
sproc3 = get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
t = time.time()
gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
self.assertLess(time.time() - t, 0.5)
self.assertEqual(gone, [])
self.assertEqual(len(alive), 3)
self.assertEqual(pids, [])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 1)
self.assertEqual(len(alive), 2)
return gone, alive
sproc3.terminate()
gone, alive = test(procs, callback)
self.assertIn(sproc3.pid, [x.pid for x in gone])
if POSIX:
self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
else:
self.assertEqual(gone.pop().returncode, 1)
self.assertEqual(pids, [sproc3.pid])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 3)
self.assertEqual(len(alive), 0)
return gone, alive
sproc1.terminate()
sproc2.terminate()
gone, alive = test(procs, callback)
self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid]))
for p in gone:
self.assertTrue(hasattr(p, 'returncode'))