def pids_active(pids_computer):
"""
This function find pids of computer and return the valid.
"""
pid_valid = {}
for pid in pids_computer:
data = None
try:
process = psutil.Process(pid)
data = {"pid": process.pid,
"status": process.status(),
"percent_cpu_used": process.cpu_percent(interval=0.0),
"percent_memory_used": process.memory_percent()}
except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
data = None
if data is not None:
pid_valid[process.name()] = data
return pid_valid
python类AccessDenied()的实例源码
def on_ask_user_allow_or_deny(self, evt):
try:
exe = evt.process.exe()
cmdline = evt.process.cmdline()
except (psutil.NoSuchProcess, psutil.AccessDenied):
logger.warn('Ransomware process is caught, but the process does '
'not exist (PID: %d)' % evt.pid)
logger.critical('\033[91m')
logger.critical('*** [Crypto ransom detected] ***')
logger.critical('[PID]: %d' % evt.process.pid)
logger.critical('[EXE]: %r' % exe)
logger.critical('[Command]: %r' % cmdline)
logger.critical('[File]: %s' % evt.path)
logger.critical('********************************\033[0m')
flush_stdin()
yes_no = raw_input('> Block it? (Y/n) ')
allow = 'n' in yes_no.lower()
if allow:
event.EventUserAllowProcess(evt.process).fire()
else:
event.EventUserDenyProcess(evt.process).fire()
def find_test_instances(prog):
pid = {}
for proc in psutil.process_iter():
_name = proc.name()
if _name in ['python', 'python3', 'Python', 'Python3']:
try:
cmd = proc.cmdline()
except psutil.AccessDenied:
continue
if len(cmd) > 5:
if cmd[1].endswith(prog):
i = cmd.index('-i')
iss = cmd[i + 1]
i = cmd.index('-t')
tag = cmd[i + 1]
i = cmd.index('-p')
port = cmd[i + 1]
since = datetime.datetime.fromtimestamp(
proc.create_time()).strftime(
"%Y-%m-%d %H:%M:%S")
pid[proc.pid] = {'iss': iss, 'tag': tag, 'port': port,
'since': since}
return pid
def _maybe_get_running_openvpn():
"""
Looks for previously running openvpn instances.
:rtype: psutil Process
"""
openvpn = None
for p in psutil.process_iter():
try:
# This needs more work, see #3268, but for the moment
# we need to be able to filter out arguments in the form
# --openvpn-foo, since otherwise we are shooting ourselves
# in the feet.
cmdline = p.cmdline()
if any(map(lambda s: s.find(
"LEAPOPENVPN") != -1, cmdline)):
openvpn = p
break
except psutil.AccessDenied:
pass
return openvpn
def pids_active(pids_computer):
"""
This function find pids of computer and return the valid.
"""
pid_valid = {}
for pid in pids_computer:
data = None
try:
process = psutil.Process(pid)
data = {"pid": process.pid,
"status": process.status(),
"percent_cpu_used": process.cpu_percent(interval=0.0),
"percent_memory_used": process.memory_percent()}
except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
data = None
if data is not None:
pid_valid[process.name()] = data
return pid_valid
def on_crypto_ransom(self, evt):
logger.debug('Whitelist: %s' % json.dumps(self.whitelist, indent=4))
logger.debug('Suspended: %s' % json.dumps([
{'pid': p.pid, 'exe': p.exe()} for p in self.suspended
], indent=4))
if any(suspended.pid == evt.pid for suspended in self.suspended):
return # ignore captured ransom events
try:
p = psutil.Process(evt.pid)
cmdline = p.cmdline()
except (psutil.NoSuchProcess, psutil.AccessDenied):
logger.warn('Suspicious process %d exited before being caught'
% evt.pid)
return
if cmdline not in self.whitelist:
p.suspend()
self.suspended.append(p)
event.EventAskUserAllowOrDeny(p, evt.path).fire()
else:
logger.info('Allowed white-listed process: %d' % evt.pid)
def get_absolute_path(event_raw):
'''
Keeps a cache of processes' cwds, in case that their events might come
after they're terminated.
'''
pid = event_raw.get('pid')
path = event_raw.get('path')
if path and path[0] == '/':
return os.path.realpath(path)
cwd = None
logger.debug('%r' % pid_cwd)
try:
process = psutil.Process(pid)
cwd = process.cwd()
pid_cwd[pid] = cwd # cache every pid's cwd
except (psutil.NoSuchProcess, psutil.AccessDenied):
cwd = pid_cwd.get(pid)
if not cwd:
return None
return os.path.realpath(os.path.join(cwd, path))
def find_current_steam_game_pid():
"""
find current play game process id with GameOverlayUI.exe
if not find return -1
"""
target_pid = -1
for proc in psutil.process_iter():
try:
if proc.name() == 'GameOverlayUI.exe':
cmds = proc.cmdline()
for index, arg in enumerate(cmds):
if arg == '-pid':
target_pid = int(cmds[index+1])
break
break
except psutil.AccessDenied:
print("Permission error or access denied on process")
return target_pid
def test_ad_on_process_creation(self):
# We are supposed to be able to instantiate Process also in case
# of zombie processes or access denied.
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.AccessDenied) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.ZombieProcess(1)) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=ValueError) as meth:
with self.assertRaises(ValueError):
psutil.Process()
assert meth.called
def test_special_pid(self):
p = psutil.Process(4)
self.assertEqual(p.name(), 'System')
# use __str__ to access all common Process properties to check
# that nothing strange happens
str(p)
p.username()
self.assertTrue(p.create_time() >= 0.0)
try:
rss, vms = p.memory_info()[:2]
except psutil.AccessDenied:
# expected on Windows Vista and Windows 7
if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
raise
else:
self.assertTrue(rss > 0)
def test_num_threads(self):
# on certain platforms such as Linux we might test for exact
# thread number, since we always have with 1 thread per process,
# but this does not apply across all platforms (OSX, Windows)
p = psutil.Process()
if OPENBSD:
try:
step1 = p.num_threads()
except psutil.AccessDenied:
raise unittest.SkipTest("on OpenBSD this requires root access")
else:
step1 = p.num_threads()
thread = ThreadTask()
thread.start()
try:
step2 = p.num_threads()
self.assertEqual(step2, step1 + 1)
thread.stop()
finally:
if thread._running:
thread.stop()
def test_children_duplicates(self):
# find the process which has the highest number of children
table = collections.defaultdict(int)
for p in psutil.process_iter():
try:
table[p.ppid()] += 1
except psutil.Error:
pass
# this is the one, now let's make sure there are no duplicates
pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
p = psutil.Process(pid)
try:
c = p.children(recursive=True)
except psutil.AccessDenied: # windows
pass
else:
self.assertEqual(len(c), len(set(c)))
def skip_on_access_denied(only_if=None):
"""Decorator to Ignore AccessDenied exceptions."""
def decorator(fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except psutil.AccessDenied:
if only_if is not None:
if not only_if:
raise
msg = "%r was skipped because it raised AccessDenied" \
% fun.__name__
raise unittest.SkipTest(msg)
return wrapper
return decorator
def update(self):
"""
Update the list of BrewPi processes by receiving them from the system with psutil.
Returns: list of BrewPiProcess objects
"""
bpList = []
matching = []
# some OS's (OS X) do not allow processes to read info from other processes.
try:
matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'brewpi.py'in s for s in p.cmdline())]
except psutil.AccessDenied:
pass
except psutil.ZombieProcess:
pass
for p in matching:
bp = self.parseProcess(p)
if bp:
bpList.append(bp)
self.list = bpList
return self.list
def set_highest_priority():
try:
import psutil
except ImportError:
return
proc = psutil.Process()
if not hasattr(proc, 'nice'):
return
# Want to set realtime on Windows.
# Fail hard for anything else right now, so it is obvious what to fix
# when adding other OS support.
try:
proc.nice(psutil.REALTIME_PRIORITY_CLASS)
return True
except psutil.AccessDenied:
pass
def closeDispyScheduler():
''' Close the Dispy Scheduler '''
global popen
if popen != None:
popen.terminate()
popen.wait()
popen=None
else:
for proc in psutil.process_iter():
try:
cmdline = proc.cmdline()
except (PermissionError, psutil.AccessDenied):
continue
for arg in cmdline:
if re.search('dispyscheduler.py',arg):
proc.send_signal(psutil.signal.SIGTERM)
def KillAllAdb():
def GetAllAdb():
for p in psutil.process_iter():
try:
if 'adb' in p.name:
yield p
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
for sig in [signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL]:
for p in GetAllAdb():
try:
logging.info('kill %d %d (%s [%s])', sig, p.pid, p.name,
' '.join(p.cmdline))
p.send_signal(sig)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
for p in GetAllAdb():
try:
logging.error('Unable to kill %d (%s [%s])', p.pid, p.name,
' '.join(p.cmdline))
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
def test_ad_on_process_creation(self):
# We are supposed to be able to instantiate Process also in case
# of zombie processes or access denied.
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.AccessDenied) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.ZombieProcess(1)) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=ValueError) as meth:
with self.assertRaises(ValueError):
psutil.Process()
assert meth.called
def test_special_pid(self):
p = psutil.Process(4)
self.assertEqual(p.name(), 'System')
# use __str__ to access all common Process properties to check
# that nothing strange happens
str(p)
p.username()
self.assertTrue(p.create_time() >= 0.0)
try:
rss, vms = p.memory_info()[:2]
except psutil.AccessDenied:
# expected on Windows Vista and Windows 7
if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
raise
else:
self.assertTrue(rss > 0)
def test_num_threads(self):
# on certain platforms such as Linux we might test for exact
# thread number, since we always have with 1 thread per process,
# but this does not apply across all platforms (OSX, Windows)
p = psutil.Process()
if OPENBSD:
try:
step1 = p.num_threads()
except psutil.AccessDenied:
raise unittest.SkipTest("on OpenBSD this requires root access")
else:
step1 = p.num_threads()
thread = ThreadTask()
thread.start()
try:
step2 = p.num_threads()
self.assertEqual(step2, step1 + 1)
finally:
thread.stop()
def test_threads(self):
p = psutil.Process()
if OPENBSD:
try:
step1 = p.threads()
except psutil.AccessDenied:
raise unittest.SkipTest("on OpenBSD this requires root access")
else:
step1 = p.threads()
thread = ThreadTask()
thread.start()
try:
step2 = p.threads()
self.assertEqual(len(step2), len(step1) + 1)
# on Linux, first thread id is supposed to be this process
if LINUX:
self.assertEqual(step2[0].id, os.getpid())
athread = step2[0]
# test named tuple
self.assertEqual(athread.id, athread[0])
self.assertEqual(athread.user_time, athread[1])
self.assertEqual(athread.system_time, athread[2])
finally:
thread.stop()
def test_children_duplicates(self):
# find the process which has the highest number of children
table = collections.defaultdict(int)
for p in psutil.process_iter():
try:
table[p.ppid()] += 1
except psutil.Error:
pass
# this is the one, now let's make sure there are no duplicates
pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
p = psutil.Process(pid)
try:
c = p.children(recursive=True)
except psutil.AccessDenied: # windows
pass
else:
self.assertEqual(len(c), len(set(c)))
def test_process_iter(self):
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
sproc = get_test_subprocess()
self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
with mock.patch('psutil.Process',
side_effect=psutil.NoSuchProcess(os.getpid())):
self.assertEqual(list(psutil.process_iter()), [])
with mock.patch('psutil.Process',
side_effect=psutil.AccessDenied(os.getpid())):
with self.assertRaises(psutil.AccessDenied):
list(psutil.process_iter())
def skip_on_access_denied(only_if=None):
"""Decorator to Ignore AccessDenied exceptions."""
def decorator(fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except psutil.AccessDenied:
if only_if is not None:
if not only_if:
raise
msg = "%r was skipped because it raised AccessDenied" \
% fun.__name__
raise unittest.SkipTest(msg)
return wrapper
return decorator
def test_ad_on_process_creation(self):
# We are supposed to be able to instantiate Process also in case
# of zombie processes or access denied.
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.AccessDenied) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.ZombieProcess(1)) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=ValueError) as meth:
with self.assertRaises(ValueError):
psutil.Process()
assert meth.called
def test_special_pid(self):
p = psutil.Process(4)
self.assertEqual(p.name(), 'System')
# use __str__ to access all common Process properties to check
# that nothing strange happens
str(p)
p.username()
self.assertTrue(p.create_time() >= 0.0)
try:
rss, vms = p.memory_info()[:2]
except psutil.AccessDenied:
# expected on Windows Vista and Windows 7
if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
raise
else:
self.assertTrue(rss > 0)
def test_num_threads(self):
# on certain platforms such as Linux we might test for exact
# thread number, since we always have with 1 thread per process,
# but this does not apply across all platforms (OSX, Windows)
p = psutil.Process()
if OPENBSD:
try:
step1 = p.num_threads()
except psutil.AccessDenied:
raise unittest.SkipTest("on OpenBSD this requires root access")
else:
step1 = p.num_threads()
thread = ThreadTask()
thread.start()
try:
step2 = p.num_threads()
self.assertEqual(step2, step1 + 1)
finally:
thread.stop()
def test_threads(self):
p = psutil.Process()
if OPENBSD:
try:
step1 = p.threads()
except psutil.AccessDenied:
raise unittest.SkipTest("on OpenBSD this requires root access")
else:
step1 = p.threads()
thread = ThreadTask()
thread.start()
try:
step2 = p.threads()
self.assertEqual(len(step2), len(step1) + 1)
# on Linux, first thread id is supposed to be this process
if LINUX:
self.assertEqual(step2[0].id, os.getpid())
athread = step2[0]
# test named tuple
self.assertEqual(athread.id, athread[0])
self.assertEqual(athread.user_time, athread[1])
self.assertEqual(athread.system_time, athread[2])
finally:
thread.stop()
def test_children_duplicates(self):
# find the process which has the highest number of children
table = collections.defaultdict(int)
for p in psutil.process_iter():
try:
table[p.ppid()] += 1
except psutil.Error:
pass
# this is the one, now let's make sure there are no duplicates
pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
p = psutil.Process(pid)
try:
c = p.children(recursive=True)
except psutil.AccessDenied: # windows
pass
else:
self.assertEqual(len(c), len(set(c)))
def test_process_iter(self):
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
sproc = get_test_subprocess()
self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
with mock.patch('psutil.Process',
side_effect=psutil.NoSuchProcess(os.getpid())):
self.assertEqual(list(psutil.process_iter()), [])
with mock.patch('psutil.Process',
side_effect=psutil.AccessDenied(os.getpid())):
with self.assertRaises(psutil.AccessDenied):
list(psutil.process_iter())