def pids_active(pids_computer):
"""
This function find pids of computer and return the valid.
"""
pid_valid = {}
for pid in pids_computer:
data = None
try:
process = psutil.Process(pid)
data = {"pid": process.pid,
"status": process.status(),
"percent_cpu_used": process.cpu_percent(interval=0.0),
"percent_memory_used": process.memory_percent()}
except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
data = None
if data is not None:
pid_valid[process.name()] = data
return pid_valid
python类pids()的实例源码
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def getSidToken(token_sid):
# trying to get system privileges
if token_sid == "S-1-5-18":
sids = ListSids()
for sid in sids:
if "winlogon" in sid[1].lower():
hToken = gethTokenFromPid(sid[0])
if hToken:
print "\t[+] Using PID: " + str(sid[0])
return hToken
else:
return None
# trying to impersonate a token
else:
pids = [int(x) for x in psutil.pids() if int(x)>4]
for pid in pids:
hToken = gethTokenFromPid(pid)
if hToken:
if GetTokenSid( hToken ) == token_sid:
print "\t[+] Using PID: " + str(pid)
return hToken
def run_module(self):
# To access user provided attributes, use self.options dictionary
Thread(target=self.monitoring).start()
print "\n[*] STARTING PROGRAMS EXECUTION IN 5 SECONDS...\n"
time.sleep(5)
for b in self.binaries():
self.print_info(" [+] Executing %s ..." % b)
previous_pid = psutil.pids()
self.execute(b)
time.sleep(int(self.args["sleep_time"]))
new_pid = psutil.pids()
print " [-] Killing the process"
self.kill(b.split('.')[0], previous_pid, new_pid)
print "\n[*] PLEASE CLOSE PROCMON PROCESS\n"
self.parsing_results()
print "\n[*] RESULTS PARSED TO XML\n"
def pids_active(pids_computer):
"""
This function find pids of computer and return the valid.
"""
pid_valid = {}
for pid in pids_computer:
data = None
try:
process = psutil.Process(pid)
data = {"pid": process.pid,
"status": process.status(),
"percent_cpu_used": process.cpu_percent(interval=0.0),
"percent_memory_used": process.memory_percent()}
except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
data = None
if data is not None:
pid_valid[process.name()] = data
return pid_valid
def ReStart(groupname):
'''??mysqlrouter'''
restart_stat = None
pids = psutil.pids()
for pid in pids:
p = psutil.Process(pid)
cmdline = p.cmdline()
if groupname+'.conf' in cmdline:
try:
os.kill(pid, 9)
os.popen('cd /etc/mysqlrouter;nohup mysqlrouter -c mysqlrouter.conf -a %s.conf &' % groupname)
return True
except Exception,e:
logging.error(traceback.format_exc())
return False
restart_stat = True
break
if restart_stat is None:
os.popen('cd /etc/mysqlrouter;nohup mysqlrouter -c mysqlrouter.conf -a %s.conf &' % groupname)
def getSidToken(token_sid):
# trying to get system privileges
if token_sid == "S-1-5-18":
sids = ListSids()
for sid in sids:
if "winlogon" in sid[1].lower():
hToken = gethTokenFromPid(sid[0])
if hToken:
print "\t[+] Using PID: " + str(sid[0])
return hToken
else:
return None
# trying to impersonate a token
else:
pids = [int(x) for x in psutil.pids() if int(x)>4]
for pid in pids:
hToken = gethTokenFromPid(pid)
if hToken:
if GetTokenSid( hToken ) == token_sid:
print "\t[+] Using PID: " + str(pid)
return hToken
def test_exe_mocked(self):
with mock.patch('psutil._pslinux.os.readlink',
side_effect=OSError(errno.ENOENT, "")) as m:
# No such file error; might be raised also if /proc/pid/exe
# path actually exists for system processes with low pids
# (about 0-20). In this case psutil is supposed to return
# an empty string.
ret = psutil.Process().exe()
assert m.called
self.assertEqual(ret, "")
# ...but if /proc/pid no longer exist we're supposed to treat
# it as an alias for zombie process
with mock.patch('psutil._pslinux.os.path.lexists',
return_value=False):
self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
def POST(self):
cpuused = psutil.cpu_percent(interval=None, percpu=False)
memused = psutil.virtual_memory()[2]
diskused = psutil.disk_usage('/')[3]
pidcount = len(psutil.pids())
uptimeshell = subprocess.Popen(['uptime'], stderr=subprocess.PIPE,stdout=subprocess.PIPE)
uptimeshell.wait()
uptime = uptimeshell.communicate()
return json.dumps(
{
"code": 0,
"current": {
"cpuused": cpuused,
"memused": memused,
"diskused": diskused,
"pidcount": pidcount,
"uptime": uptime
}
}
)
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_issue_687(self):
# In case of thread ID:
# - pid_exists() is supposed to return False
# - Process(tid) is supposed to work
# - pids() should not return the TID
# See: https://github.com/giampaolo/psutil/issues/687
t = ThreadTask()
t.start()
try:
p = psutil.Process()
tid = p.threads()[1].id
assert not psutil.pid_exists(tid), tid
pt = psutil.Process(tid)
pt.as_dict()
self.assertNotIn(tid, psutil.pids())
finally:
t.stop()
def test_exe_mocked(self):
with mock.patch('psutil._pslinux.os.readlink',
side_effect=OSError(errno.ENOENT, "")) as m:
# No such file error; might be raised also if /proc/pid/exe
# path actually exists for system processes with low pids
# (about 0-20). In this case psutil is supposed to return
# an empty string.
ret = psutil.Process().exe()
assert m.called
self.assertEqual(ret, "")
# ...but if /proc/pid no longer exist we're supposed to treat
# it as an alias for zombie process
with mock.patch('psutil._pslinux.os.path.lexists',
return_value=False):
self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
def test_procfs_path(self):
tdir = tempfile.mkdtemp()
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
self.assertRaises(IOError, psutil.cpu_times)
self.assertRaises(IOError, psutil.cpu_times, percpu=True)
self.assertRaises(IOError, psutil.boot_time)
# self.assertRaises(IOError, psutil.pids)
self.assertRaises(IOError, psutil.net_connections)
self.assertRaises(IOError, psutil.net_io_counters)
self.assertRaises(IOError, psutil.net_if_stats)
self.assertRaises(IOError, psutil.disk_io_counters)
self.assertRaises(IOError, psutil.disk_partitions)
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
os.rmdir(tdir)
def test_issue_687(self):
# In case of thread ID:
# - pid_exists() is supposed to return False
# - Process(tid) is supposed to work
# - pids() should not return the TID
# See: https://github.com/giampaolo/psutil/issues/687
t = ThreadTask()
t.start()
try:
p = psutil.Process()
tid = p.threads()[1].id
assert not psutil.pid_exists(tid), tid
pt = psutil.Process(tid)
pt.as_dict()
self.assertNotIn(tid, psutil.pids())
finally:
t.stop()
# =====================================================================
# --- sensors
# =====================================================================
def test_exe_mocked(self):
with mock.patch('psutil._pslinux.os.readlink',
side_effect=OSError(errno.ENOENT, "")) as m:
# No such file error; might be raised also if /proc/pid/exe
# path actually exists for system processes with low pids
# (about 0-20). In this case psutil is supposed to return
# an empty string.
ret = psutil.Process().exe()
assert m.called
self.assertEqual(ret, "")
# ...but if /proc/pid no longer exist we're supposed to treat
# it as an alias for zombie process
with mock.patch('psutil._pslinux.os.path.lexists',
return_value=False):
self.assertRaises(psutil.ZombieProcess, psutil.Process().exe)
def kill_pid(pid, procname='', daemon=True):
'''Find a PID with optional qualifications and kill it.'''
if pid not in psutil.pids():
return False
for p in psutil.process_iter():
if p.pid != pid:
continue
if procname and p.name() != procname:
continue
if daemon and p.ppid() != 1:
continue
_kill_pid_object(p)
return True
return False
###########################################################################
# Some packages (nscd, nslcd) start their daemons after installation.
# If I'm in an ARM chroot, they live after the chroot exits. Kill them.
# In a cascade situation, sometimes the /proc/<PID>/root link is changed
# to '/some/path (deleted)'. This routine still works.
def last_process_created(self, prev_pids, new_pids):
pids = []
for p in new_pids:
if p not in prev_pids:
pids.append(p)
return pids
def last_process_created(self, prev_pids, new_pids):
pids = []
for p in new_pids:
if p not in prev_pids:
pids.append(p)
return pids
def is_cmd_open(self, prev_pids):
created_pids = []
for pid in psutil.pids():
if pid not in prev_pids:
created_pids.append(pid)
try:
return 'cmd.exe' in [psutil.Process(p).name() for p in created_pids]
except:
pass
def handle_dll_local(self, subpath, binary):
path = subpath + "\\x86_microsoft.windows.common-controls_6595b64144ccf1df_6.0.15063.0_none_583b8639f462029f\\"
try:
try:
subprocess.check_call(
["powershell", "-C", "rm", "-r", "-Force", subpath, "-erroraction", "'silentlycontinue'"])
except:
pass
print "[+] Creating: " + path
subprocess.check_call(
["powershell", "-C", "mkdir", path, ">", "$null"])
print "[+] Copying the malicious dll to the path"
subprocess.check_call(
["powershell", "-C", "cp", self.args["malicious_dll"], path])
prev_pids = psutil.pids()
print "[*] Executing the binary"
subprocess.check_call(["powershell", "-C", binary])
time.sleep(1)
if self.is_cmd_open(prev_pids):
print colored("[*] THIS BINARY IS VULNERABLE TO DLL HIJACKING UAC BYPASS!", 'cyan', attrs=['bold'])
if binary not in self._results["vulnerables"]:
self._results["vulnerables"].append(binary)
else:
if binary not in self._results['sospechosos']:
self._results['sospechosos'].append(binary)
new_pids = psutil.pids()
self.kill(binary, prev_pids, new_pids)
print "[-] Deleting the path and cleaning up\n"
subprocess.check_call(
["powershell", "-C", "rm", "-r", "-Force", subpath])
except subprocess.CalledProcessError as error:
print "ERROR: COPYING THE FILE"
def is_cmd_open(self, prev_pids):
created_pids = []
for pid in psutil.pids():
if pid not in prev_pids:
created_pids.append(pid)
try:
return 'cmd.exe' in [psutil.Process(p).name() for p in created_pids]
except:
pass
def handle_dll_local(self, subpath, binary, clean):
path = subpath + "\\x86_microsoft.windows.common-controls_6595b64144ccf1df_6.0.15063.0_none_583b8639f462029f\\"
try:
print "[+] Creating: " + path
subprocess.check_call(
["powershell", "-C", "mkdir", path, ">", "$null"])
print "[+] Copying the malicious dll to the path"
subprocess.check_call(
["powershell", "-C", "cp", self.args["malicious_dll"], path])
prev_pids = psutil.pids()
print "[*] Executing the binary"
subprocess.check_call(["powershell", "-C", binary])
except subprocess.CalledProcessError as error:
self.print_ko(str(error) + "\n")
def clean_path(self, subpath, binary):
print "[-] Deleting the path and cleaning up\n"
for pid in psutil.pids():
if binary in psutil.Process(pid).name():
try:
print "Killing the process..."
subprocess.check_call(["taskkill", "/t", "/f", "/pid", str(pid)])
except subprocess.CalledProcessError:
self.print_ko("[!!] The process %s can't be killed" % binary)
subprocess.check_call(
["powershell", "-C", "rm", "-r", "-Force", subpath])
def find(prefix, suffixes=None, local_bin=False):
current_pid = os.getpid()
for pid in psutil.pids():
if pid == current_pid or not psutil.pid_exists(pid):
continue
process = psutil.Process(pid)
if process.name() == prefix:
cmd = process.cmdline()
if local_bin:
check = False
for word in cmd:
if '/usr/local/bin' in word:
check = True
if not check:
continue
if suffixes is not None:
check = False
for word in cmd:
if word in suffixes:
check = True
if not check:
continue
log.warning('Already existing') # : %s' % cmd)
return True
return False
def process_send_data(socket, context):
"""
Send all memory, cpu, disk, network data of computer to server(master node)
"""
while True:
try:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
pids_computer = psutil.pids()
info_to_send = json.dumps({
"computer_utc_clock": str(datetime.datetime.utcnow()),
"computer_clock": str(datetime.datetime.now()),
"hostname": platform.node(),
"mac_address": mac_address(),
"ipv4_interfaces": internet_addresses(),
"cpu": {
"percent_used": cpu_percent
},
"memory": {
"total_bytes": memory_info.total,
"total_bytes_used": memory_info.used,
"percent_used": memory_info.percent
},
"disk": {
"total_bytes": disk_info.total,
"total_bytes_used": disk_info.used,
"total_bytes_free": disk_info.free,
"percent_used": disk_info.percent
},
"process": pids_active(pids_computer)
}).encode()
#send json data in the channel 'status', although is not necessary to send.
socket.send_multipart([b"status", info_to_send])
#time.sleep(0.500)
except (KeyboardInterrupt, SystemExit):
socket.close()
context.term()
def process_send_data(socket, context):
"""
Send all memory, cpu, disk, network data of computer to server(master node)
"""
while True:
try:
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
pids_computer = psutil.pids()
info_to_send = json.dumps({
"computer_utc_clock": str(datetime.datetime.utcnow()),
"computer_clock": str(datetime.datetime.now()),
"hostname": platform.node(),
"mac_address": mac_address(),
"ipv4_interfaces": internet_addresses(),
"cpu": {
"percent_used": cpu_percent
},
"memory": {
"total_bytes": memory_info.total,
"total_bytes_used": memory_info.used,
"percent_used": memory_info.percent
},
"disk": {
"total_bytes": disk_info.total,
"total_bytes_used": disk_info.used,
"total_bytes_free": disk_info.free,
"percent_used": disk_info.percent
},
"process": pids_active(pids_computer)
}).encode()
#send json data in the channel 'status', although is not necessary to send.
socket.send_multipart([b"status", info_to_send])
#time.sleep(0.500)
except (KeyboardInterrupt, SystemExit):
socket.close()
context.term()
def lock_or_exit(self, lock_filename, message="process %s is already running"):
pid = psutil.Process().pid
if os.path.exists(lock_filename):
with open(lock_filename) as lockfile:
old_pid = int(lockfile.read())
if old_pid in psutil.pids():
logging.info(message % old_pid)
sys.exit(1)
else:
os.unlink(lock_filename)
def running_emacs_count(self):
"""Count how many instances of Emacs are currently open"""
try:
import psutil
return [psutil.Process(p).name() for p in psutil.pids()].count('emacs')
except ImportError:
Logger.warning('Cannot determine how many instances of Emacs are running. This may cause sync issues')
return 0
def test_wait_for_pid(self):
wait_for_pid(os.getpid())
nopid = max(psutil.pids()) + 99999
with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
w = wmi.WMI().Win32_Process()
wmi_pids = set([x.ProcessId for x in w])
psutil_pids = set(psutil.pids())
self.assertEqual(wmi_pids, psutil_pids)