def skip_on_access_denied(only_if=None):
"""Decorator to Ignore AccessDenied exceptions."""
def decorator(fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except psutil.AccessDenied:
if only_if is not None:
if not only_if:
raise
msg = "%r was skipped because it raised AccessDenied" \
% fun.__name__
raise unittest.SkipTest(msg)
return wrapper
return decorator
python类AccessDenied()的实例源码
def kill_proc(procname):
done = False
for proc in psutil.process_iter():
process = psutil.Process(proc.pid)
if process.name() == procname:
try:
process.terminate()
process.wait(timeout=3)
done = True
except psutil.AccessDenied:
print "Error: Access Denied to terminate %s" % procname
except psutil.NoSuchProcess:
pass
except psutil.TimeoutExpired:
if proz['killcount'] == 2:
print "Error: Terminating of %s failed! (tried 3 times)" % procname
else:
print "Error: Terminating of %s took to long.. retrying" % procname
proz['killcount'] += 1
kill_proc(procname)
break
if done:
print "%s terminated!" % procname
def all_running_processes_for_case(self, _filter):
procs = []
for p in psutil.process_iter():
try:
if psutil.pid_exists(p.pid):
if _filter in p.cwd():
procs.append({"run": p.cwd() + " " + p.name(),"cmd":p.cmdline()})
except (psutil.AccessDenied):
pass
except (psutil.NoSuchProcess):
pass
return procs
##
# Function to kill all running processes for one case
# @param _filter search filter <string>
# @retval list of all killed processes <list>
def kill_all_running_processes_for_case(self, _filter):
procs = []
for p in psutil.process_iter():
try:
if psutil.pid_exists(p.pid):
if _filter in p.cwd():
procs.append({"run": p.cwd() + " " + p.name()})
self.kill_proc_tree(p.pid)
except (psutil.AccessDenied):
pass
except (psutil.NoSuchProcess):
pass
return procs
##
# Function to retrieve all processes for one case (running and completed)
# @param _case case <string>
# @retval list of all processes <list>
def find_test_instance(iss, tag):
match = {"-i": iss, "-t": tag}
for proc in psutil.process_iter():
try:
cmd = proc.cmdline()
except psutil.AccessDenied:
continue
if len(cmd) < 5:
continue
flag = 0
for first, second in match.items():
try:
i = cmd.index(first)
except ValueError:
break
if cmd[i + 1] != second:
break
else:
flag += 1
if flag == len(match):
return proc
return None
def get_free_port(address, initial_port):
"""Find an unused TCP port in a specified range. This should not
be used in misson-critical applications - a race condition may
occur if someone grabs the port before caller of this function
has chance to use it.
Parameters:
address (string): an ip address of interface to use
initial_port (int) : port to start iteration with
Return:
iterator that will return next unused port on a specified
interface
"""
try:
# On OSX this function requires root privileges
psutil.net_connections()
except psutil.AccessDenied:
return count(initial_port)
def _unused_ports():
for port in count(initial_port):
# check if the port is being used
connect_using_port = (
conn
for conn in psutil.net_connections()
if hasattr(conn, 'laddr') and
conn.laddr[0] == address and
conn.laddr[1] == port
)
# only generate unused ports
if not any(connect_using_port):
yield port
return _unused_ports()
def _get_ports_in_use(cls):
"""
Returns a set of ports currently used on localhost.
"""
try:
return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')])
except psutil.AccessDenied:
# On some platforms (such as OS X), root privilege is required to get used ports.
# In that case we avoid port confliction to the best of our knowledge.
_logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially')
return set()
def on_user_deny_process(self, evt):
for p in self.suspended:
if p.pid == evt.pid:
logger.info('Killing PID %d (%s)' %
(p.pid, evt.cmdline))
p.kill()
self.suspended.remove(p)
try:
cmdline = evt.cmdline
except psutil.AccessDenied:
return
if cmdline in self.whitelist:
self.whitelist.remove(cmdline)
return
def to_absolute(pid, fd, path):
if not path:
return None
if path[0] == '/':
return path
try:
process = psutil.Process(pid)
cwd = process.cwd()
pid_cwd[pid] = cwd # cache every pid's cwd
except (psutil.NoSuchProcess, psutil.AccessDenied):
cwd = pid_cwd.get(pid)
if not cwd:
return None
return os.path.realpath(os.path.join(cwd, path))
def find_launch_game_pid(process_name):
"""
find current play game process id with process_name
if not find return -1
"""
target_pid = -1
for proc in psutil.process_iter():
try:
if proc.name() == process_name:
return proc.pid
except psutil.AccessDenied:
print("Permission error or access denied on process")
return target_pid
def test_threads_mocked(self):
# Test the case where os.listdir() returns a file (thread)
# which no longer exists by the time we open() it (race
# condition). threads() is supposed to ignore that instead
# of raising NSP.
def open_mock(name, *args, **kwargs):
if name.startswith('/proc/%s/task' % os.getpid()):
raise IOError(errno.ENOENT, "")
else:
return orig_open(name, *args, **kwargs)
orig_open = open
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
with mock.patch(patch_point, side_effect=open_mock) as m:
ret = psutil.Process().threads()
assert m.called
self.assertEqual(ret, [])
# ...but if it bumps into something != ENOENT we want an
# exception.
def open_mock(name, *args, **kwargs):
if name.startswith('/proc/%s/task' % os.getpid()):
raise IOError(errno.EPERM, "")
else:
return orig_open(name, *args, **kwargs)
with mock.patch(patch_point, side_effect=open_mock):
self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
# not sure why (doesn't fail locally)
# https://travis-ci.org/giampaolo/psutil/jobs/108629915
def test_process__repr__(self, func=repr):
p = psutil.Process()
r = func(p)
self.assertIn("psutil.Process", r)
self.assertIn("pid=%s" % p.pid, r)
self.assertIn("name=", r)
self.assertIn(p.name(), r)
with mock.patch.object(psutil.Process, "name",
side_effect=psutil.ZombieProcess(os.getpid())):
p = psutil.Process()
r = func(p)
self.assertIn("pid=%s" % p.pid, r)
self.assertIn("zombie", r)
self.assertNotIn("name=", r)
with mock.patch.object(psutil.Process, "name",
side_effect=psutil.NoSuchProcess(os.getpid())):
p = psutil.Process()
r = func(p)
self.assertIn("pid=%s" % p.pid, r)
self.assertIn("terminated", r)
self.assertNotIn("name=", r)
with mock.patch.object(psutil.Process, "name",
side_effect=psutil.AccessDenied(os.getpid())):
p = psutil.Process()
r = func(p)
self.assertIn("pid=%s" % p.pid, r)
self.assertNotIn("name=", r)
def test_access_denied__repr__(self, func=repr):
self.assertEqual(
repr(psutil.AccessDenied(321)),
"psutil.AccessDenied (pid=321)")
self.assertEqual(
repr(psutil.AccessDenied(321, name='foo')),
"psutil.AccessDenied (pid=321, name='foo')")
self.assertEqual(
repr(psutil.AccessDenied(321, msg='foo')),
"psutil.AccessDenied foo")
def assert_stdout(self, exe, args=None):
exe = '"%s"' % os.path.join(SCRIPTS_DIR, exe)
if args:
exe = exe + ' ' + args
try:
out = sh(sys.executable + ' ' + exe).strip()
except RuntimeError as err:
if 'AccessDenied' in str(err):
return str(err)
else:
raise
assert out, out
return out
def test_issue_24(self):
p = psutil.Process(0)
self.assertRaises(psutil.AccessDenied, p.kill)
def test_handles_leak(self):
# Call all Process methods and make sure no handles are left
# open. This is here mainly to make sure functions using
# OpenProcess() always call CloseHandle().
def call(p, attr):
attr = getattr(p, name, None)
if attr is not None and callable(attr):
attr()
else:
attr
p = psutil.Process(self.pid)
failures = []
for name in dir(psutil.Process):
if name.startswith('_') \
or name in ('terminate', 'kill', 'suspend', 'resume',
'nice', 'send_signal', 'wait', 'children',
'as_dict'):
continue
else:
try:
call(p, name)
num1 = p.num_handles()
call(p, name)
num2 = p.num_handles()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
else:
if num2 > num1:
fail = \
"failure while processing Process.%s method " \
"(before=%s, after=%s)" % (name, num1, num2)
failures.append(fail)
if failures:
self.fail('\n' + '\n'.join(failures))
def test_name_always_available(self):
# On Windows name() is never supposed to raise AccessDenied,
# see https://github.com/giampaolo/psutil/issues/627
for p in psutil.process_iter():
try:
p.name()
except psutil.NoSuchProcess:
pass
def test_compare_name_exe(self):
for p in psutil.process_iter():
try:
a = os.path.basename(p.exe())
b = p.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
else:
self.assertEqual(a, b)
def test_name(self):
name = psutil.Process(self.pid).name()
with mock.patch("psutil._psplatform.cext.proc_exe",
side_effect=psutil.AccessDenied(os.getpid())) as fun:
self.assertEqual(psutil.Process(self.pid).name(), name)
assert fun.called
def test_win_service_get(self):
name = next(psutil.win_service_iter()).name()
with self.assertRaises(psutil.NoSuchProcess) as cm:
psutil.win_service_get(name + '???')
self.assertEqual(cm.exception.name, name + '???')
# test NoSuchProcess
service = psutil.win_service_get(name)
exc = WindowsError(
psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST, "")
with mock.patch("psutil._psplatform.cext.winservice_query_status",
side_effect=exc):
self.assertRaises(psutil.NoSuchProcess, service.status)
with mock.patch("psutil._psplatform.cext.winservice_query_config",
side_effect=exc):
self.assertRaises(psutil.NoSuchProcess, service.username)
# test AccessDenied
exc = WindowsError(
psutil._psplatform.cext.ERROR_ACCESS_DENIED, "")
with mock.patch("psutil._psplatform.cext.winservice_query_status",
side_effect=exc):
self.assertRaises(psutil.AccessDenied, service.status)
with mock.patch("psutil._psplatform.cext.winservice_query_config",
side_effect=exc):
self.assertRaises(psutil.AccessDenied, service.username)
# test __str__ and __repr__
self.assertIn(service.name(), str(service))
self.assertIn(service.display_name(), str(service))
self.assertIn(service.name(), repr(service))
self.assertIn(service.display_name(), repr(service))