def test_parent_ppid(self):
this_parent = os.getpid()
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertEqual(p.ppid(), this_parent)
self.assertEqual(p.parent().pid, this_parent)
# no other process is supposed to have us as parent
reap_children(recursive=True)
if APPVEYOR:
# Occasional failures, see:
# https://ci.appveyor.com/project/giampaolo/psutil/build/
# job/0hs623nenj7w4m33
return
for p in psutil.process_iter():
if p.pid == sproc.pid:
continue
# XXX: sometimes this fails on Windows; not sure why.
self.assertNotEqual(p.ppid(), this_parent, msg=p)
python类process_iter()的实例源码
def test_children_duplicates(self):
# find the process which has the highest number of children
table = collections.defaultdict(int)
for p in psutil.process_iter():
try:
table[p.ppid()] += 1
except psutil.Error:
pass
# this is the one, now let's make sure there are no duplicates
pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
p = psutil.Process(pid)
try:
c = p.children(recursive=True)
except psutil.AccessDenied: # windows
pass
else:
self.assertEqual(len(c), len(set(c)))
def test_ppid(self):
if hasattr(os, 'getppid'):
self.assertEqual(psutil.Process().ppid(), os.getppid())
this_parent = os.getpid()
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertEqual(p.ppid(), this_parent)
self.assertEqual(p.parent().pid, this_parent)
# no other process is supposed to have us as parent
reap_children(recursive=True)
if APPVEYOR:
# Occasional failures, see:
# https://ci.appveyor.com/project/giampaolo/psutil/build/
# job/0hs623nenj7w4m33
return
for p in psutil.process_iter():
if p.pid == sproc.pid:
continue
# XXX: sometimes this fails on Windows; not sure why.
self.assertNotEqual(p.ppid(), this_parent, msg=p)
def test_children_duplicates(self):
# find the process which has the highest number of children
table = collections.defaultdict(int)
for p in psutil.process_iter():
try:
table[p.ppid()] += 1
except psutil.Error:
pass
# this is the one, now let's make sure there are no duplicates
pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
p = psutil.Process(pid)
try:
c = p.children(recursive=True)
except psutil.AccessDenied: # windows
pass
else:
self.assertEqual(len(c), len(set(c)))
def get_used_files():
"""Get files used by processes with name scanpy."""
import psutil
loop_over_scanpy_processes = (proc for proc in psutil.process_iter()
if proc.name() == 'scanpy')
filenames = []
for proc in loop_over_scanpy_processes:
try:
flist = proc.open_files()
for nt in flist:
filenames.append(nt.path)
# This catches a race condition where a process ends
# before we can examine its files
except psutil.NoSuchProcess as err:
pass
return set(filenames)
def kill_proc(procname):
done = False
for proc in psutil.process_iter():
process = psutil.Process(proc.pid)
if process.name() == procname:
try:
process.terminate()
process.wait(timeout=3)
done = True
except psutil.AccessDenied:
print "Error: Access Denied to terminate %s" % procname
except psutil.NoSuchProcess:
pass
except psutil.TimeoutExpired:
if proz['killcount'] == 2:
print "Error: Terminating of %s failed! (tried 3 times)" % procname
else:
print "Error: Terminating of %s took to long.. retrying" % procname
proz['killcount'] += 1
kill_proc(procname)
break
if done:
print "%s terminated!" % procname
def get_pid(name):
try:
pid = [p.pid for p in psutil.process_iter() if name in str(p.name)]
return pid[0]
except IndexError:
pass
try:
node_id = '/NODEINFO'
node_api = rosnode.get_api_uri(rospy.get_master(), name)
code, msg, pid = xmlrpclib.ServerProxy(node_api[2]).getPid(node_id)
except IOError:
pass
else:
return pid
try:
return int(check_output(["pidof", "-s", name]))
except CalledProcessError:
pass
rospy.logerr("Node '" + name + "' is not running!")
return None
def all_running_processes_for_case(self, _filter):
procs = []
for p in psutil.process_iter():
try:
if psutil.pid_exists(p.pid):
if _filter in p.cwd():
procs.append({"run": p.cwd() + " " + p.name(),"cmd":p.cmdline()})
except (psutil.AccessDenied):
pass
except (psutil.NoSuchProcess):
pass
return procs
##
# Function to kill all running processes for one case
# @param _filter search filter <string>
# @retval list of all killed processes <list>
def kill_all_running_processes_for_case(self, _filter):
procs = []
for p in psutil.process_iter():
try:
if psutil.pid_exists(p.pid):
if _filter in p.cwd():
procs.append({"run": p.cwd() + " " + p.name()})
self.kill_proc_tree(p.pid)
except (psutil.AccessDenied):
pass
except (psutil.NoSuchProcess):
pass
return procs
##
# Function to retrieve all processes for one case (running and completed)
# @param _case case <string>
# @retval list of all processes <list>
def process_for_run(self, _filter):
proc = None
for p in psutil.process_iter():
try:
if _filter in p.cwd():
proc = p
break
except:
pass
return proc
##
# Function to start process
# @param _case case <string>
# @param _run_id run id <string>
# @param _command command <list>
# @param create_console create a console <boolean>
# @param async program does not wait for process completion if set to True <boolean>
def pid_by_name(name):
"""pid_by_name(name) -> int list
Arguments:
name (str): Name of program.
Returns:
List of PIDs matching `name` sorted by lifetime, youngest to oldest.
Example:
>>> os.getpid() in pid_by_name(name(os.getpid()))
True
"""
def match(p):
if p.name() == name:
return True
try:
if p.exe() == name:
return True
except:
pass
return False
return [p.pid for p in psutil.process_iter() if match(p)]
def _check_virtualbox():
"""
Check if VirtualBox is running.
VirtualBox conflicts with S2E's requirement for KVM, so VirtualBox must
*not* be running together with S2E.
"""
# Adapted from https://github.com/giampaolo/psutil/issues/132#issuecomment-44017679
# to avoid race conditions
for proc in psutil.process_iter():
try:
if proc.name == 'VBoxHeadless':
raise CommandError('S2E uses KVM to build images. VirtualBox '
'is currently running, which is not '
'compatible with KVM. Please close all '
'VirtualBox VMs and try again')
except NoSuchProcess:
pass
def __running():
"""
Check if SafeEyes is already running.
"""
process_count = 0
for proc in psutil.process_iter():
if not proc.cmdline:
continue
try:
# Check if safeeyes is in process arguments
if callable(proc.cmdline):
# Latest psutil has cmdline function
cmd_line = proc.cmdline()
else:
# In older versions cmdline was a list object
cmd_line = proc.cmdline
if ('python3' in cmd_line[0] or 'python' in cmd_line[0]) and ('safeeyes' in cmd_line[1] or 'safeeyes' in cmd_line):
process_count += 1
if process_count > 1:
return True
# Ignore if process does not exist or does not have command line args
except (IndexError, psutil.NoSuchProcess):
pass
return False
def kill_windows_cassandra_procs():
# On Windows, forcefully terminate any leftover previously running cassandra processes. This is a temporary
# workaround until we can determine the cause of intermittent hung-open tests and file-handles.
if is_win():
try:
import psutil
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name', 'cmdline'])
except psutil.NoSuchProcess:
pass
else:
if (pinfo['name'] == 'java.exe' and '-Dcassandra' in pinfo['cmdline']):
print 'Found running cassandra process with pid: ' + str(pinfo['pid']) + '. Killing.'
psutil.Process(pinfo['pid']).kill()
except ImportError:
debug("WARN: psutil not installed. Cannot detect and kill "
"running cassandra processes - you may see cascading dtest failures.")
def getRunningFuzzers():
proc_list = []
for proc in psutil.process_iter():
if any(fuzzer in s for s in proc.cmdline()):
proc_list.append(proc)
if any("crash-watch" in s for s in proc.cmdline()):
proc_list.append(proc)
if any(ROVING_CLIENT in s for s in proc.cmdline()):
proc_list.append(proc)
if any(ROVING_SERVER in s for s in proc.cmdline()):
proc_list.append(proc)
if any(FUZZDIR + "/target" in s for s in proc.cmdline()):
proc_list.append(proc)
if any(targ in s for s in proc.cmdline()):
if proc not in proc_list:
proc_list.append(proc)
# hard coded, in the future someone might use another fuzzer!
if any("afl-fuzz" in s for s in proc.cmdline()):
if proc not in proc_list:
proc_list.append(proc)
proc_list = set(proc_list) # easy way to filter duplicates ;)
proc_list = list(proc_list)
return proc_list
def pidof(pgname):
pids = []
for proc in psutil.process_iter():
# search for matches in the process name and cmdline
try:
name = proc.name()
except psutil.Error:
pass
else:
if name == pgname:
pids.append(str(proc.pid))
continue
try:
cmdline = proc.cmdline()
except psutil.Error:
pass
else:
if cmdline and cmdline[0] == pgname:
pids.append(str(proc.pid))
return pids
def main():
templ = "%-5s %-30s %-30s %-13s %-6s %s"
print(templ % (
"Proto", "Local address", "Remote address", "Status", "PID",
"Program name"))
proc_names = {}
for p in psutil.process_iter():
try:
proc_names[p.pid] = p.name()
except psutil.Error:
pass
for c in psutil.net_connections(kind='inet'):
laddr = "%s:%s" % (c.laddr)
raddr = ""
if c.raddr:
raddr = "%s:%s" % (c.raddr)
print(templ % (
proto_map[(c.family, c.type)],
laddr,
raddr or AD,
c.status,
c.pid or AD,
proc_names.get(c.pid, '?')[:15],
))
def poll(interval):
# sleep some time
time.sleep(interval)
procs = []
procs_status = {}
for p in psutil.process_iter():
try:
p.dict = p.as_dict(['username', 'nice', 'memory_info',
'memory_percent', 'cpu_percent',
'cpu_times', 'name', 'status'])
try:
procs_status[p.dict['status']] += 1
except KeyError:
procs_status[p.dict['status']] = 1
except psutil.NoSuchProcess:
pass
else:
procs.append(p)
# return processes sorted by CPU percent usage
processes = sorted(procs, key=lambda p: p.dict['cpu_percent'],
reverse=True)
return (processes, procs_status)
def kill_pid(pid, procname='', daemon=True):
'''Find a PID with optional qualifications and kill it.'''
if pid not in psutil.pids():
return False
for p in psutil.process_iter():
if p.pid != pid:
continue
if procname and p.name() != procname:
continue
if daemon and p.ppid() != 1:
continue
_kill_pid_object(p)
return True
return False
###########################################################################
# Some packages (nscd, nslcd) start their daemons after installation.
# If I'm in an ARM chroot, they live after the chroot exits. Kill them.
# In a cascade situation, sometimes the /proc/<PID>/root link is changed
# to '/some/path (deleted)'. This routine still works.
def ownhandle() :
my_regex = r".*" + re.escape(sys.argv[1]) + r".*"
regex = re.compile(my_regex, re.IGNORECASE)
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid'])
except psutil.NoSuchProcess:
pass
else:
#print pinfo['pid']
try:
proci = psutil.Process(pinfo['pid'])
for files in proci.open_files() :
#print files
#handles = re.match(my_regex, files, re.IGNORECASE)
match = regex.search(str(files))
#print match
if match is not None and pinfo['pid'] not in safepids :
return(pinfo['pid'])
except :
pass