def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
exit_sig = p.wait()
self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(exit_sig, sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
with self.assertRaises(psutil.AccessDenied):
psutil.Process().send_signal(sig)
# Sending a signal to process with PID 0 is not allowed as
# it would affect every process in the process group of
# the calling process (os.getpid()) instead of PID 0").
if 0 in psutil.pids():
p = psutil.Process(0)
self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
python类pids()的实例源码
def test_pid_0(self):
# Process(0) is supposed to work on all platforms except Linux
if 0 not in psutil.pids():
self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
return
# test all methods
p = psutil.Process(0)
for name in psutil._as_dict_attrnames:
if name == 'pid':
continue
meth = getattr(p, name)
try:
ret = meth()
except psutil.AccessDenied:
pass
else:
if name in ("uids", "gids"):
self.assertEqual(ret.real, 0)
elif name == "username":
if POSIX:
self.assertEqual(p.username(), 'root')
elif WINDOWS:
self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
elif name == "name":
assert name, name
if hasattr(p, 'rlimit'):
try:
p.rlimit(psutil.RLIMIT_FSIZE)
except psutil.AccessDenied:
pass
p.as_dict()
if not OPENBSD:
self.assertIn(0, psutil.pids())
self.assertTrue(psutil.pid_exists(0))
def test_pid_exists(self):
sproc = get_test_subprocess()
self.assertTrue(psutil.pid_exists(sproc.pid))
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertFalse(psutil.pid_exists(sproc.pid))
self.assertFalse(psutil.pid_exists(-1))
self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
# pid 0
psutil.pid_exists(0) == 0 in psutil.pids()
def test_pid_exists_2(self):
reap_children()
pids = psutil.pids()
for pid in pids:
try:
assert psutil.pid_exists(pid)
except AssertionError:
# in case the process disappeared in meantime fail only
# if it is no longer in psutil.pids()
time.sleep(.1)
if pid in psutil.pids():
self.fail(pid)
pids = range(max(pids) + 5000, max(pids) + 6000)
for pid in pids:
self.assertFalse(psutil.pid_exists(pid), msg=pid)
def test_pids(self):
plist = [x.pid for x in psutil.process_iter()]
pidlist = psutil.pids()
self.assertEqual(plist.sort(), pidlist.sort())
# make sure every pid is unique
self.assertEqual(len(pidlist), len(set(pidlist)))
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
if SUNOS:
cmd = ["ps", "-A", "-o", "pid"]
else:
cmd = ["ps", "ax", "-o", "pid"]
p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
assert p.poll() == 0
if PY3:
output = str(output, sys.stdout.encoding)
pids_ps = []
for line in output.split('\n')[1:]:
if line:
pid = int(line.split()[0].strip())
pids_ps.append(pid)
# remove ps subprocess pid which is supposed to be dead in meantime
pids_ps.remove(p.pid)
pids_psutil = psutil.pids()
pids_ps.sort()
pids_psutil.sort()
# on OSX ps doesn't show pid 0
if OSX and 0 not in pids_ps:
pids_ps.insert(0, 0)
if pids_ps != pids_psutil:
difference = [x for x in pids_psutil if x not in pids_ps] + \
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))
# for some reason ifconfig -a does not report all interfaces
# returned by psutil
def pids():
return psutil.pids()
def find_pids_by_name(p_name):
arr = []
for pid in psutil.pids():
if psutil.Process(id).name() == p_name:
arr.append(pid)
return arr
def kill_by_pids(pids):
for pid in pids:
psutil.Process(pid).kill()
def ancestors(pid):
"""ancestors(pid) -> int list
Arguments:
pid (int): PID of the process.
Returns:
List of PIDs of whose parent process is `pid` or an ancestor of `pid`.
"""
pids = []
while pid != 0:
pids.append(pid)
pid = parent(pid)
return pids
def check_process():
print 'Checking WoW is running'
wow_process_names = ["World of Warcraft"]
running = False
for pid in psutil.pids():
p = psutil.Process(pid)
if any(p.name() in s for s in wow_process_names):
print(p.name())
running = True
if not running and not dev:
print 'WoW is not running'
exit()
print 'WoW is running'
# raw_input('Pleas e put your fishing-rod on key 1, zoom-in to max, move camera on fishing-float and press any key')
def process_count(self):
p_count = 0
pids = psutil.pids()
for pid in pids:
try:
p = psutil.Process(pid)
exe = p.exe()
cmd = p.cmdline()
cmd = " ".join(cmd)
if "jd.py" in cmd:
# print pid, cmd[:30]
p_count += 1
except:
pass
return p_count
def cleanScratch():
""" Clean scratch directory.
It checks whether any other gcMapExplorer process is running. In case,
when only one process (i.e. current) is running, all files with "gcx"
prefix will be deleted from default scratch directory.
"""
config = getConfig()
count = 0
for pid in psutil.pids():
p = None
try:
p = psutil.Process(pid)
except psutil.NoSuchProcess:
pass
if p is not None:
if 'gcMapExplorer' in p.name():
count += 1
# If only one gcMapExplorer is running, it is the current one
if count == 1:
for f in os.listdir(config['Dirs']['WorkingDirectory']):
if not os.path.isfile(f):
continue
basename = os.path.basename(f)
base = os.path.splitext(basename)[0]
if "gcx" in base:
try:
print(' Removing File: {0}'.format(f))
os.remove(f)
except IOError:
pass
print(' ... Finished Cleaning')
def get_system_status(memory_total=False, memory_total_actual=False,
memory_total_usage=False, memory_total_free=False,
all_pids=False, swap_memory=False, pid=False):
"""
Parameters
----------
threads: bool
return dict {id: (user_time, system_time)}
memory_maps: bool
return dict {path: rss}
Note
----
All memory is returned in `MiB`
To calculate memory_percent:
get_system_status(memory_usage=True) / get_system_status(memory_total=True) * 100
"""
import psutil
# ====== general system query ====== #
if memory_total:
return psutil.virtual_memory().total / float(2 ** 20)
if memory_total_actual:
return psutil.virtual_memory().available / float(2 ** 20)
if memory_total_usage:
return psutil.virtual_memory().used / float(2 ** 20)
if memory_total_free:
return psutil.virtual_memory().free / float(2 ** 20)
if swap_memory:
tmp = psutil.swap_memory()
tmp.total /= float(2 ** 20)
tmp.used /= float(2 ** 20)
tmp.free /= float(2 ** 20)
tmp.sin /= float(2**20)
tmp.sout /= float(2**20)
return tmp
if all_pids:
return psutil.pids()
if pid:
return os.getpid()
def quit():
mypid = os.getpid()
for pid in sorted(psutil.pids(), reverse=True):
if pid == mypid:
continue
try:
p = psutil.Process(pid)
if p.exe() == sys.executable:
p.send_signal(signal.CTRL_C_EVENT)
except:
pass
def _get_process_by_path(self, path):
"""????????"""
lower_path = str(path).lower()
for pid in psutil.pids():
try:
process = psutil.Process(pid)
tmp = process.exe()
if str(tmp).lower() == lower_path:
return process
except:
continue
return None
def test_pids(self):
self.execute(psutil.pids)
# --- net
def test_wait_for_pid(self):
wait_for_pid(os.getpid())
nopid = max(psutil.pids()) + 99999
with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
exit_sig = p.wait()
self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(exit_sig, -sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
with self.assertRaises(psutil.AccessDenied):
psutil.Process().send_signal(sig)
# Sending a signal to process with PID 0 is not allowed as
# it would affect every process in the process group of
# the calling process (os.getpid()) instead of PID 0").
if 0 in psutil.pids():
p = psutil.Process(0)
self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def test_pid_0(self):
# Process(0) is supposed to work on all platforms except Linux
if 0 not in psutil.pids():
self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
return
# test all methods
p = psutil.Process(0)
for name in psutil._as_dict_attrnames:
if name == 'pid':
continue
meth = getattr(p, name)
try:
ret = meth()
except psutil.AccessDenied:
pass
else:
if name in ("uids", "gids"):
self.assertEqual(ret.real, 0)
elif name == "username":
if POSIX:
self.assertEqual(p.username(), 'root')
elif WINDOWS:
self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
elif name == "name":
assert name, name
if hasattr(p, 'rlimit'):
try:
p.rlimit(psutil.RLIMIT_FSIZE)
except psutil.AccessDenied:
pass
p.as_dict()
if not OPENBSD:
self.assertIn(0, psutil.pids())
self.assertTrue(psutil.pid_exists(0))