python类AccessDenied()的实例源码

__init__.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def skip_on_access_denied(only_if=None):
    """Decorator to Ignore AccessDenied exceptions."""
    def decorator(fun):
        @functools.wraps(fun)
        def wrapper(*args, **kwargs):
            try:
                return fun(*args, **kwargs)
            except psutil.AccessDenied:
                if only_if is not None:
                    if not only_if:
                        raise
                msg = "%r was skipped because it raised AccessDenied" \
                      % fun.__name__
                raise unittest.SkipTest(msg)
        return wrapper
    return decorator
interrupt_switch_prog.py 文件源码 项目:Sample-Code 作者: meigrafd 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def kill_proc(procname):
    done = False
    for proc in psutil.process_iter():
        process = psutil.Process(proc.pid)
        if process.name() == procname:
            try:
                process.terminate()
                process.wait(timeout=3)
                done = True
            except psutil.AccessDenied:
                print "Error: Access Denied to terminate %s" % procname
            except psutil.NoSuchProcess:
                pass
            except psutil.TimeoutExpired:
                if proz['killcount'] == 2:
                    print "Error: Terminating of %s failed! (tried 3 times)" % procname
                else:
                    print "Error: Terminating of %s took to long.. retrying" % procname
                    proz['killcount'] += 1
                    kill_proc(procname)
            break
    if done:
        print "%s terminated!" % procname
processes.py 文件源码 项目:ppapi 作者: PPAPI 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def all_running_processes_for_case(self, _filter):
        procs = []
        for p in psutil.process_iter():
            try:
                if psutil.pid_exists(p.pid):
                    if _filter in p.cwd(): 
                        procs.append({"run": p.cwd() + " " + p.name(),"cmd":p.cmdline()})
            except (psutil.AccessDenied):
                pass
            except (psutil.NoSuchProcess):
                pass
        return procs

    ##
    # Function to kill all running processes for one case
    #  @param _filter search filter <string>
    #  @retval list of all killed processes <list>
processes.py 文件源码 项目:ppapi 作者: PPAPI 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def kill_all_running_processes_for_case(self, _filter):
        procs = []
        for p in psutil.process_iter():
            try:
                if psutil.pid_exists(p.pid):
                    if _filter in p.cwd():
                        procs.append({"run": p.cwd() + " " + p.name()})
                        self.kill_proc_tree(p.pid)
            except (psutil.AccessDenied):
                pass
            except (psutil.NoSuchProcess):
                pass
        return procs

    ##
    # Function to retrieve all processes for one case (running and completed)
    #  @param _case case <string>
    #  @retval list of all processes <list>
proc.py 文件源码 项目:otest 作者: rohe 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def find_test_instance(iss, tag):
    match = {"-i": iss, "-t": tag}

    for proc in psutil.process_iter():
        try:
            cmd = proc.cmdline()
        except psutil.AccessDenied:
            continue

        if len(cmd) < 5:
            continue

        flag = 0
        for first, second in match.items():
            try:
                i = cmd.index(first)
            except ValueError:
                break

            if cmd[i + 1] != second:
                break
            else:
                flag += 1

        if flag == len(match):
            return proc
    return None
utils.py 文件源码 项目:raiden 作者: raiden-network 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def get_free_port(address, initial_port):
    """Find an unused TCP port in a specified range. This should not
      be used in misson-critical applications - a race condition may
      occur if someone grabs the port before caller of this function
      has chance to use it.
      Parameters:
          address       (string): an ip address of interface to use
          initial_port  (int)   : port to start iteration with
      Return:
          iterator that will return next unused port on a specified
            interface
    """

    try:
        # On OSX this function requires root privileges
        psutil.net_connections()
    except psutil.AccessDenied:
        return count(initial_port)

    def _unused_ports():
        for port in count(initial_port):
            # check if the port is being used
            connect_using_port = (
                conn
                for conn in psutil.net_connections()
                if hasattr(conn, 'laddr') and
                conn.laddr[0] == address and
                conn.laddr[1] == port
            )

            # only generate unused ports
            if not any(connect_using_port):
                yield port

    return _unused_ports()
_process.py 文件源码 项目:jubakit 作者: jubatus 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_ports_in_use(cls):
    """
    Returns a set of ports currently used on localhost.
    """
    try:
      return set([x.laddr[1] for x in psutil.net_connections(kind='inet4')])
    except psutil.AccessDenied:
      # On some platforms (such as OS X), root privilege is required to get used ports.
      # In that case we avoid port confliction to the best of our knowledge.
      _logger.info('ports in use cannot be obtained on this platform; ports will be assigned sequentially')
      return set()
handlers.py 文件源码 项目:ransomcare 作者: Happyholic1203 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def on_user_deny_process(self, evt):
        for p in self.suspended:
            if p.pid == evt.pid:
                logger.info('Killing PID %d (%s)' %
                            (p.pid, evt.cmdline))
                p.kill()
                self.suspended.remove(p)
                try:
                    cmdline = evt.cmdline
                except psutil.AccessDenied:
                    return
                if cmdline in self.whitelist:
                    self.whitelist.remove(cmdline)
                return
sniffers.py 文件源码 项目:ransomcare 作者: Happyholic1203 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def to_absolute(pid, fd, path):
    if not path:
        return None
    if path[0] == '/':
        return path
    try:
        process = psutil.Process(pid)
        cwd = process.cwd()
        pid_cwd[pid] = cwd  # cache every pid's cwd
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        cwd = pid_cwd.get(pid)
        if not cwd:
            return None

    return os.path.realpath(os.path.join(cwd, path))
process_helper.py 文件源码 项目:PyCloudGameSaving 作者: sd12582000 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def find_launch_game_pid(process_name):
    """
    find current play game process id with process_name
    if not find return -1
    """
    target_pid = -1
    for proc in psutil.process_iter():
        try:
            if proc.name() == process_name:
                return proc.pid
        except psutil.AccessDenied:
            print("Permission error or access denied on process")
    return target_pid
test_linux.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_threads_mocked(self):
        # Test the case where os.listdir() returns a file (thread)
        # which no longer exists by the time we open() it (race
        # condition). threads() is supposed to ignore that instead
        # of raising NSP.
        def open_mock(name, *args, **kwargs):
            if name.startswith('/proc/%s/task' % os.getpid()):
                raise IOError(errno.ENOENT, "")
            else:
                return orig_open(name, *args, **kwargs)

        orig_open = open
        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
        with mock.patch(patch_point, side_effect=open_mock) as m:
            ret = psutil.Process().threads()
            assert m.called
            self.assertEqual(ret, [])

        # ...but if it bumps into something != ENOENT we want an
        # exception.
        def open_mock(name, *args, **kwargs):
            if name.startswith('/proc/%s/task' % os.getpid()):
                raise IOError(errno.EPERM, "")
            else:
                return orig_open(name, *args, **kwargs)

        with mock.patch(patch_point, side_effect=open_mock):
            self.assertRaises(psutil.AccessDenied, psutil.Process().threads)

    # not sure why (doesn't fail locally)
    # https://travis-ci.org/giampaolo/psutil/jobs/108629915
test_misc.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_process__repr__(self, func=repr):
        p = psutil.Process()
        r = func(p)
        self.assertIn("psutil.Process", r)
        self.assertIn("pid=%s" % p.pid, r)
        self.assertIn("name=", r)
        self.assertIn(p.name(), r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.ZombieProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("zombie", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.NoSuchProcess(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertIn("terminated", r)
            self.assertNotIn("name=", r)
        with mock.patch.object(psutil.Process, "name",
                               side_effect=psutil.AccessDenied(os.getpid())):
            p = psutil.Process()
            r = func(p)
            self.assertIn("pid=%s" % p.pid, r)
            self.assertNotIn("name=", r)
test_misc.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_access_denied__repr__(self, func=repr):
        self.assertEqual(
            repr(psutil.AccessDenied(321)),
            "psutil.AccessDenied (pid=321)")
        self.assertEqual(
            repr(psutil.AccessDenied(321, name='foo')),
            "psutil.AccessDenied (pid=321, name='foo')")
        self.assertEqual(
            repr(psutil.AccessDenied(321, msg='foo')),
            "psutil.AccessDenied foo")
test_misc.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def assert_stdout(self, exe, args=None):
        exe = '"%s"' % os.path.join(SCRIPTS_DIR, exe)
        if args:
            exe = exe + ' ' + args
        try:
            out = sh(sys.executable + ' ' + exe).strip()
        except RuntimeError as err:
            if 'AccessDenied' in str(err):
                return str(err)
            else:
                raise
        assert out, out
        return out
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_issue_24(self):
        p = psutil.Process(0)
        self.assertRaises(psutil.AccessDenied, p.kill)
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_handles_leak(self):
        # Call all Process methods and make sure no handles are left
        # open. This is here mainly to make sure functions using
        # OpenProcess() always call CloseHandle().
        def call(p, attr):
            attr = getattr(p, name, None)
            if attr is not None and callable(attr):
                attr()
            else:
                attr

        p = psutil.Process(self.pid)
        failures = []
        for name in dir(psutil.Process):
            if name.startswith('_') \
                    or name in ('terminate', 'kill', 'suspend', 'resume',
                                'nice', 'send_signal', 'wait', 'children',
                                'as_dict'):
                continue
            else:
                try:
                    call(p, name)
                    num1 = p.num_handles()
                    call(p, name)
                    num2 = p.num_handles()
                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    pass
                else:
                    if num2 > num1:
                        fail = \
                            "failure while processing Process.%s method " \
                            "(before=%s, after=%s)" % (name, num1, num2)
                        failures.append(fail)
        if failures:
            self.fail('\n' + '\n'.join(failures))
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_name_always_available(self):
        # On Windows name() is never supposed to raise AccessDenied,
        # see https://github.com/giampaolo/psutil/issues/627
        for p in psutil.process_iter():
            try:
                p.name()
            except psutil.NoSuchProcess:
                pass
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_compare_name_exe(self):
        for p in psutil.process_iter():
            try:
                a = os.path.basename(p.exe())
                b = p.name()
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
            else:
                self.assertEqual(a, b)
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_name(self):
        name = psutil.Process(self.pid).name()
        with mock.patch("psutil._psplatform.cext.proc_exe",
                        side_effect=psutil.AccessDenied(os.getpid())) as fun:
            self.assertEqual(psutil.Process(self.pid).name(), name)
            assert fun.called
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_win_service_get(self):
        name = next(psutil.win_service_iter()).name()

        with self.assertRaises(psutil.NoSuchProcess) as cm:
            psutil.win_service_get(name + '???')
        self.assertEqual(cm.exception.name, name + '???')

        # test NoSuchProcess
        service = psutil.win_service_get(name)
        exc = WindowsError(
            psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST, "")
        with mock.patch("psutil._psplatform.cext.winservice_query_status",
                        side_effect=exc):
            self.assertRaises(psutil.NoSuchProcess, service.status)
        with mock.patch("psutil._psplatform.cext.winservice_query_config",
                        side_effect=exc):
            self.assertRaises(psutil.NoSuchProcess, service.username)

        # test AccessDenied
        exc = WindowsError(
            psutil._psplatform.cext.ERROR_ACCESS_DENIED, "")
        with mock.patch("psutil._psplatform.cext.winservice_query_status",
                        side_effect=exc):
            self.assertRaises(psutil.AccessDenied, service.status)
        with mock.patch("psutil._psplatform.cext.winservice_query_config",
                        side_effect=exc):
            self.assertRaises(psutil.AccessDenied, service.username)

        # test __str__ and __repr__
        self.assertIn(service.name(), str(service))
        self.assertIn(service.display_name(), str(service))
        self.assertIn(service.name(), repr(service))
        self.assertIn(service.display_name(), repr(service))


问题


面经


文章

微信
公众号

扫码关注公众号