python类AccessDenied()的实例源码

__init__.py 文件源码 项目:Chasar 作者: camilochs 项目源码 文件源码 阅读 85 收藏 0 点赞 0 评论 0
def pids_active(pids_computer):
    """
    This function find pids of computer and return the valid.
    """
    pid_valid = {}
    for pid in pids_computer:
        data = None
        try:
            process = psutil.Process(pid)
            data = {"pid": process.pid,
                    "status": process.status(),
                    "percent_cpu_used": process.cpu_percent(interval=0.0),
                    "percent_memory_used": process.memory_percent()}

        except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
            data = None

        if data is not None:
            pid_valid[process.name()] = data
    return pid_valid
console.py 文件源码 项目:ransomcare 作者: Happyholic1203 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def on_ask_user_allow_or_deny(self, evt):
        try:
            exe = evt.process.exe()
            cmdline = evt.process.cmdline()
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            logger.warn('Ransomware process is caught, but the process does '
                        'not exist (PID: %d)' % evt.pid)

        logger.critical('\033[91m')
        logger.critical('*** [Crypto ransom detected] ***')
        logger.critical('[PID]: %d' % evt.process.pid)
        logger.critical('[EXE]: %r' % exe)
        logger.critical('[Command]: %r' % cmdline)
        logger.critical('[File]: %s' % evt.path)
        logger.critical('********************************\033[0m')
        flush_stdin()
        yes_no = raw_input('> Block it? (Y/n) ')

        allow = 'n' in yes_no.lower()
        if allow:
            event.EventUserAllowProcess(evt.process).fire()
        else:
            event.EventUserDenyProcess(evt.process).fire()
proc.py 文件源码 项目:otest 作者: rohe 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def find_test_instances(prog):
    pid = {}
    for proc in psutil.process_iter():
        _name = proc.name()
        if _name in ['python', 'python3', 'Python', 'Python3']:
            try:
                cmd = proc.cmdline()
            except psutil.AccessDenied:
                continue

            if len(cmd) > 5:
                if cmd[1].endswith(prog):
                    i = cmd.index('-i')
                    iss = cmd[i + 1]
                    i = cmd.index('-t')
                    tag = cmd[i + 1]
                    i = cmd.index('-p')
                    port = cmd[i + 1]
                    since = datetime.datetime.fromtimestamp(
                        proc.create_time()).strftime(
                        "%Y-%m-%d %H:%M:%S")
                    pid[proc.pid] = {'iss': iss, 'tag': tag, 'port': port,
                                     'since': since}
    return pid
linux.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _maybe_get_running_openvpn():
    """
    Looks for previously running openvpn instances.

    :rtype: psutil Process
    """
    openvpn = None
    for p in psutil.process_iter():
        try:
            # This needs more work, see #3268, but for the moment
            # we need to be able to filter out arguments in the form
            # --openvpn-foo, since otherwise we are shooting ourselves
            # in the feet.

            cmdline = p.cmdline()
            if any(map(lambda s: s.find(
                    "LEAPOPENVPN") != -1, cmdline)):
                openvpn = p
                break
        except psutil.AccessDenied:
            pass
    return openvpn
__init__.py 文件源码 项目:Chasar 作者: camilochs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def pids_active(pids_computer):
    """
    This function find pids of computer and return the valid.
    """
    pid_valid = {}
    for pid in pids_computer:
        data = None
        try:
            process = psutil.Process(pid)
            data = {"pid": process.pid,
                    "status": process.status(),
                    "percent_cpu_used": process.cpu_percent(interval=0.0),
                    "percent_memory_used": process.memory_percent()}

        except (psutil.ZombieProcess, psutil.AccessDenied, psutil.NoSuchProcess):
            data = None

        if data is not None:
            pid_valid[process.name()] = data
    return pid_valid
handlers.py 文件源码 项目:ransomcare 作者: Happyholic1203 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def on_crypto_ransom(self, evt):
        logger.debug('Whitelist: %s' % json.dumps(self.whitelist, indent=4))
        logger.debug('Suspended: %s' % json.dumps([
            {'pid': p.pid, 'exe': p.exe()} for p in self.suspended
        ], indent=4))
        if any(suspended.pid == evt.pid for suspended in self.suspended):
            return  # ignore captured ransom events

        try:
            p = psutil.Process(evt.pid)
            cmdline = p.cmdline()
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            logger.warn('Suspicious process %d exited before being caught'
                        % evt.pid)
            return

        if cmdline not in self.whitelist:
            p.suspend()
            self.suspended.append(p)
            event.EventAskUserAllowOrDeny(p, evt.path).fire()
        else:
            logger.info('Allowed white-listed process: %d' % evt.pid)
sniffers.py 文件源码 项目:ransomcare 作者: Happyholic1203 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_absolute_path(event_raw):
    '''
    Keeps a cache of processes' cwds, in case that their events might come
    after they're terminated.
    '''
    pid = event_raw.get('pid')
    path = event_raw.get('path')
    if path and path[0] == '/':
        return os.path.realpath(path)

    cwd = None
    logger.debug('%r' % pid_cwd)
    try:
        process = psutil.Process(pid)
        cwd = process.cwd()
        pid_cwd[pid] = cwd  # cache every pid's cwd
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        cwd = pid_cwd.get(pid)
        if not cwd:
            return None

    return os.path.realpath(os.path.join(cwd, path))
steam_helper.py 文件源码 项目:PyCloudGameSaving 作者: sd12582000 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def find_current_steam_game_pid():
    """
    find current play game process id with GameOverlayUI.exe
    if not find return -1
    """
    target_pid = -1
    for proc in psutil.process_iter():
        try:
            if proc.name() == 'GameOverlayUI.exe':
                cmds = proc.cmdline()
                for index, arg in enumerate(cmds):
                    if arg == '-pid':
                        target_pid = int(cmds[index+1])
                        break
                break
        except psutil.AccessDenied:
            print("Permission error or access denied on process")
    return target_pid
test_misc.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_ad_on_process_creation(self):
        # We are supposed to be able to instantiate Process also in case
        # of zombie processes or access denied.
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.AccessDenied) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.ZombieProcess(1)) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=ValueError) as meth:
            with self.assertRaises(ValueError):
                psutil.Process()
            assert meth.called
test_windows.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_special_pid(self):
        p = psutil.Process(4)
        self.assertEqual(p.name(), 'System')
        # use __str__ to access all common Process properties to check
        # that nothing strange happens
        str(p)
        p.username()
        self.assertTrue(p.create_time() >= 0.0)
        try:
            rss, vms = p.memory_info()[:2]
        except psutil.AccessDenied:
            # expected on Windows Vista and Windows 7
            if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
                raise
        else:
            self.assertTrue(rss > 0)
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_num_threads(self):
        # on certain platforms such as Linux we might test for exact
        # thread number, since we always have with 1 thread per process,
        # but this does not apply across all platforms (OSX, Windows)
        p = psutil.Process()
        if OPENBSD:
            try:
                step1 = p.num_threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest("on OpenBSD this requires root access")
        else:
            step1 = p.num_threads()

        thread = ThreadTask()
        thread.start()
        try:
            step2 = p.num_threads()
            self.assertEqual(step2, step1 + 1)
            thread.stop()
        finally:
            if thread._running:
                thread.stop()
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_children_duplicates(self):
        # find the process which has the highest number of children
        table = collections.defaultdict(int)
        for p in psutil.process_iter():
            try:
                table[p.ppid()] += 1
            except psutil.Error:
                pass
        # this is the one, now let's make sure there are no duplicates
        pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
        p = psutil.Process(pid)
        try:
            c = p.children(recursive=True)
        except psutil.AccessDenied:  # windows
            pass
        else:
            self.assertEqual(len(c), len(set(c)))
__init__.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def skip_on_access_denied(only_if=None):
    """Decorator to Ignore AccessDenied exceptions."""
    def decorator(fun):
        @functools.wraps(fun)
        def wrapper(*args, **kwargs):
            try:
                return fun(*args, **kwargs)
            except psutil.AccessDenied:
                if only_if is not None:
                    if not only_if:
                        raise
                msg = "%r was skipped because it raised AccessDenied" \
                      % fun.__name__
                raise unittest.SkipTest(msg)
        return wrapper
    return decorator
BrewPiProcess.py 文件源码 项目:fermentrack 作者: thorrak 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update(self):
        """
        Update the list of BrewPi processes by receiving them from the system with psutil.
        Returns: list of BrewPiProcess objects
        """
        bpList = []
        matching = []

        # some OS's (OS X) do not allow processes to read info from other processes. 
        try:
            matching = [p for p in psutil.process_iter() if any('python' in p.name() and 'brewpi.py'in s for s in p.cmdline())]
        except psutil.AccessDenied:
            pass
        except psutil.ZombieProcess:
            pass

        for p in matching:
            bp = self.parseProcess(p)
            if bp:
                bpList.append(bp)
        self.list = bpList
        return self.list
_cpu_utils.py 文件源码 项目:perf 作者: vstinner 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def set_highest_priority():
    try:
        import psutil
    except ImportError:
        return

    proc = psutil.Process()
    if not hasattr(proc, 'nice'):
        return

    # Want to set realtime on Windows.
    # Fail hard for anything else right now, so it is obvious what to fix
    # when adding other OS support.
    try:
        proc.nice(psutil.REALTIME_PRIORITY_CLASS)
        return True
    except psutil.AccessDenied:
        pass
amazon_control.py 文件源码 项目:scikit-discovery 作者: MITHaystack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def closeDispyScheduler():
    ''' Close the Dispy Scheduler '''

    global popen
    if popen != None:
        popen.terminate()
        popen.wait()
        popen=None

    else:
        for proc in psutil.process_iter():
            try:
                cmdline = proc.cmdline()
            except (PermissionError, psutil.AccessDenied):
                continue

            for arg in cmdline:
                if re.search('dispyscheduler.py',arg):
                    proc.send_signal(psutil.signal.SIGTERM)
bb_device_status_check.py 文件源码 项目:buildroot 作者: flutter 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def KillAllAdb():
  def GetAllAdb():
    for p in psutil.process_iter():
      try:
        if 'adb' in p.name:
          yield p
      except (psutil.NoSuchProcess, psutil.AccessDenied):
        pass

  for sig in [signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL]:
    for p in GetAllAdb():
      try:
        logging.info('kill %d %d (%s [%s])', sig, p.pid, p.name,
                     ' '.join(p.cmdline))
        p.send_signal(sig)
      except (psutil.NoSuchProcess, psutil.AccessDenied):
        pass
  for p in GetAllAdb():
    try:
      logging.error('Unable to kill %d (%s [%s])', p.pid, p.name,
                    ' '.join(p.cmdline))
    except (psutil.NoSuchProcess, psutil.AccessDenied):
      pass
test_misc.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_ad_on_process_creation(self):
        # We are supposed to be able to instantiate Process also in case
        # of zombie processes or access denied.
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.AccessDenied) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.ZombieProcess(1)) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=ValueError) as meth:
            with self.assertRaises(ValueError):
                psutil.Process()
            assert meth.called
test_windows.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_special_pid(self):
        p = psutil.Process(4)
        self.assertEqual(p.name(), 'System')
        # use __str__ to access all common Process properties to check
        # that nothing strange happens
        str(p)
        p.username()
        self.assertTrue(p.create_time() >= 0.0)
        try:
            rss, vms = p.memory_info()[:2]
        except psutil.AccessDenied:
            # expected on Windows Vista and Windows 7
            if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
                raise
        else:
            self.assertTrue(rss > 0)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_num_threads(self):
        # on certain platforms such as Linux we might test for exact
        # thread number, since we always have with 1 thread per process,
        # but this does not apply across all platforms (OSX, Windows)
        p = psutil.Process()
        if OPENBSD:
            try:
                step1 = p.num_threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest("on OpenBSD this requires root access")
        else:
            step1 = p.num_threads()

        thread = ThreadTask()
        thread.start()
        try:
            step2 = p.num_threads()
            self.assertEqual(step2, step1 + 1)
        finally:
            thread.stop()
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_threads(self):
        p = psutil.Process()
        if OPENBSD:
            try:
                step1 = p.threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest("on OpenBSD this requires root access")
        else:
            step1 = p.threads()

        thread = ThreadTask()
        thread.start()
        try:
            step2 = p.threads()
            self.assertEqual(len(step2), len(step1) + 1)
            # on Linux, first thread id is supposed to be this process
            if LINUX:
                self.assertEqual(step2[0].id, os.getpid())
            athread = step2[0]
            # test named tuple
            self.assertEqual(athread.id, athread[0])
            self.assertEqual(athread.user_time, athread[1])
            self.assertEqual(athread.system_time, athread[2])
        finally:
            thread.stop()
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_children_duplicates(self):
        # find the process which has the highest number of children
        table = collections.defaultdict(int)
        for p in psutil.process_iter():
            try:
                table[p.ppid()] += 1
            except psutil.Error:
                pass
        # this is the one, now let's make sure there are no duplicates
        pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
        p = psutil.Process(pid)
        try:
            c = p.children(recursive=True)
        except psutil.AccessDenied:  # windows
            pass
        else:
            self.assertEqual(len(c), len(set(c)))
test_system.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_process_iter(self):
        self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
        sproc = get_test_subprocess()
        self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])

        with mock.patch('psutil.Process',
                        side_effect=psutil.NoSuchProcess(os.getpid())):
            self.assertEqual(list(psutil.process_iter()), [])
        with mock.patch('psutil.Process',
                        side_effect=psutil.AccessDenied(os.getpid())):
            with self.assertRaises(psutil.AccessDenied):
                list(psutil.process_iter())
__init__.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def skip_on_access_denied(only_if=None):
    """Decorator to Ignore AccessDenied exceptions."""
    def decorator(fun):
        @functools.wraps(fun)
        def wrapper(*args, **kwargs):
            try:
                return fun(*args, **kwargs)
            except psutil.AccessDenied:
                if only_if is not None:
                    if not only_if:
                        raise
                msg = "%r was skipped because it raised AccessDenied" \
                      % fun.__name__
                raise unittest.SkipTest(msg)
        return wrapper
    return decorator
test_misc.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ad_on_process_creation(self):
        # We are supposed to be able to instantiate Process also in case
        # of zombie processes or access denied.
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.AccessDenied) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=psutil.ZombieProcess(1)) as meth:
            psutil.Process()
            assert meth.called
        with mock.patch.object(psutil.Process, 'create_time',
                               side_effect=ValueError) as meth:
            with self.assertRaises(ValueError):
                psutil.Process()
            assert meth.called
test_windows.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_special_pid(self):
        p = psutil.Process(4)
        self.assertEqual(p.name(), 'System')
        # use __str__ to access all common Process properties to check
        # that nothing strange happens
        str(p)
        p.username()
        self.assertTrue(p.create_time() >= 0.0)
        try:
            rss, vms = p.memory_info()[:2]
        except psutil.AccessDenied:
            # expected on Windows Vista and Windows 7
            if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
                raise
        else:
            self.assertTrue(rss > 0)
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_num_threads(self):
        # on certain platforms such as Linux we might test for exact
        # thread number, since we always have with 1 thread per process,
        # but this does not apply across all platforms (OSX, Windows)
        p = psutil.Process()
        if OPENBSD:
            try:
                step1 = p.num_threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest("on OpenBSD this requires root access")
        else:
            step1 = p.num_threads()

        thread = ThreadTask()
        thread.start()
        try:
            step2 = p.num_threads()
            self.assertEqual(step2, step1 + 1)
        finally:
            thread.stop()
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_threads(self):
        p = psutil.Process()
        if OPENBSD:
            try:
                step1 = p.threads()
            except psutil.AccessDenied:
                raise unittest.SkipTest("on OpenBSD this requires root access")
        else:
            step1 = p.threads()

        thread = ThreadTask()
        thread.start()
        try:
            step2 = p.threads()
            self.assertEqual(len(step2), len(step1) + 1)
            # on Linux, first thread id is supposed to be this process
            if LINUX:
                self.assertEqual(step2[0].id, os.getpid())
            athread = step2[0]
            # test named tuple
            self.assertEqual(athread.id, athread[0])
            self.assertEqual(athread.user_time, athread[1])
            self.assertEqual(athread.system_time, athread[2])
        finally:
            thread.stop()
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_children_duplicates(self):
        # find the process which has the highest number of children
        table = collections.defaultdict(int)
        for p in psutil.process_iter():
            try:
                table[p.ppid()] += 1
            except psutil.Error:
                pass
        # this is the one, now let's make sure there are no duplicates
        pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
        p = psutil.Process(pid)
        try:
            c = p.children(recursive=True)
        except psutil.AccessDenied:  # windows
            pass
        else:
            self.assertEqual(len(c), len(set(c)))
test_system.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_process_iter(self):
        self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
        sproc = get_test_subprocess()
        self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])

        with mock.patch('psutil.Process',
                        side_effect=psutil.NoSuchProcess(os.getpid())):
            self.assertEqual(list(psutil.process_iter()), [])
        with mock.patch('psutil.Process',
                        side_effect=psutil.AccessDenied(os.getpid())):
            with self.assertRaises(psutil.AccessDenied):
                list(psutil.process_iter())


问题


面经


文章

微信
公众号

扫码关注公众号