python类pids()的实例源码

test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))
test_system.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_pid_exists(self):
        sproc = get_test_subprocess()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
        # pid 0
        psutil.pid_exists(0) == 0 in psutil.pids()
test_system.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
test_system.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_pids(self):
        plist = [x.pid for x in psutil.process_iter()]
        pidlist = psutil.pids()
        self.assertEqual(plist.sort(), pidlist.sort())
        # make sure every pid is unique
        self.assertEqual(len(pidlist), len(set(pidlist)))
test_posix.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_pids(self):
        # Note: this test might fail if the OS is starting/killing
        # other processes in the meantime
        if SUNOS:
            cmd = ["ps", "-A", "-o", "pid"]
        else:
            cmd = ["ps", "ax", "-o", "pid"]
        p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        assert p.poll() == 0
        if PY3:
            output = str(output, sys.stdout.encoding)
        pids_ps = []
        for line in output.split('\n')[1:]:
            if line:
                pid = int(line.split()[0].strip())
                pids_ps.append(pid)
        # remove ps subprocess pid which is supposed to be dead in meantime
        pids_ps.remove(p.pid)
        pids_psutil = psutil.pids()
        pids_ps.sort()
        pids_psutil.sort()

        # on OSX ps doesn't show pid 0
        if OSX and 0 not in pids_ps:
            pids_ps.insert(0, 0)

        if pids_ps != pids_psutil:
            difference = [x for x in pids_psutil if x not in pids_ps] + \
                         [x for x in pids_ps if x not in pids_psutil]
            self.fail("difference: " + str(difference))

    # for some reason ifconfig -a does not report all interfaces
    # returned by psutil
system.py 文件源码 项目:ahenk 作者: Pardus-LiderAhenk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pids():
            return psutil.pids()
system.py 文件源码 项目:ahenk 作者: Pardus-LiderAhenk 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def find_pids_by_name(p_name):
            arr = []
            for pid in psutil.pids():
                if psutil.Process(id).name() == p_name:
                    arr.append(pid)
            return arr
system.py 文件源码 项目:ahenk 作者: Pardus-LiderAhenk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def kill_by_pids(pids):
            for pid in pids:
                psutil.Process(pid).kill()
proc.py 文件源码 项目:pwndemo 作者: zh-explorer 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def ancestors(pid):
    """ancestors(pid) -> int list

    Arguments:
        pid (int): PID of the process.

    Returns:
        List of PIDs of whose parent process is `pid` or an ancestor of `pid`.
    """
    pids = []
    while pid != 0:
         pids.append(pid)
         pid = parent(pid)
    return pids
fishing.py 文件源码 项目:wow-fishipy 作者: kioltk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_process():
    print 'Checking WoW is running'
    wow_process_names = ["World of Warcraft"]
    running = False
    for pid in psutil.pids():
        p = psutil.Process(pid)
        if any(p.name() in s for s in wow_process_names):
            print(p.name())
            running = True
    if not running and not dev:
        print 'WoW is not running'
        exit()
    print 'WoW is running'
        # raw_input('Pleas e put your fishing-rod on key 1, zoom-in to max, move camera on fishing-float and press any key')
jd.py 文件源码 项目:jd-crawler 作者: qiaofei32 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_count(self):
        p_count = 0
        pids = psutil.pids()
        for pid in pids:
            try:
                p = psutil.Process(pid)
                exe = p.exe()
                cmd = p.cmdline()
                cmd = " ".join(cmd)
                if "jd.py" in cmd:
                    # print pid, cmd[:30]
                    p_count += 1
            except:
                pass
        return p_count
config.py 文件源码 项目:gcMapExplorer 作者: rjdkmr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def cleanScratch():
    """ Clean scratch directory.

    It checks whether any other gcMapExplorer process is running. In case,
    when only one process (i.e. current) is running, all files with "gcx"
    prefix will be deleted from default scratch directory.

    """
    config = getConfig()
    count = 0
    for pid in psutil.pids():
        p = None
        try:
            p = psutil.Process(pid)
        except psutil.NoSuchProcess:
            pass

        if p is not None:
            if 'gcMapExplorer' in p.name():
                count += 1

    # If only one gcMapExplorer is running, it is the current one
    if count == 1:
        for f in os.listdir(config['Dirs']['WorkingDirectory']):
            if not os.path.isfile(f):
                continue
            basename = os.path.basename(f)
            base = os.path.splitext(basename)[0]
            if "gcx" in base:
                try:
                    print(' Removing File: {0}'.format(f))
                    os.remove(f)
                except IOError:
                    pass

        print('     ... Finished Cleaning')
__init__.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_system_status(memory_total=False, memory_total_actual=False,
                      memory_total_usage=False, memory_total_free=False,
                      all_pids=False, swap_memory=False, pid=False):
  """
  Parameters
  ----------
  threads: bool
      return dict {id: (user_time, system_time)}
  memory_maps: bool
      return dict {path: rss}

  Note
  ----
  All memory is returned in `MiB`
  To calculate memory_percent:
      get_system_status(memory_usage=True) / get_system_status(memory_total=True) * 100
  """
  import psutil
  # ====== general system query ====== #
  if memory_total:
    return psutil.virtual_memory().total / float(2 ** 20)
  if memory_total_actual:
    return psutil.virtual_memory().available / float(2 ** 20)
  if memory_total_usage:
    return psutil.virtual_memory().used / float(2 ** 20)
  if memory_total_free:
    return psutil.virtual_memory().free / float(2 ** 20)
  if swap_memory:
    tmp = psutil.swap_memory()
    tmp.total /= float(2 ** 20)
    tmp.used /= float(2 ** 20)
    tmp.free /= float(2 ** 20)
    tmp.sin /= float(2**20)
    tmp.sout /= float(2**20)
    return tmp
  if all_pids:
    return psutil.pids()
  if pid:
    return os.getpid()
px.py 文件源码 项目:px 作者: genotrance 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def quit():
    mypid = os.getpid()
    for pid in sorted(psutil.pids(), reverse=True):
        if pid == mypid:
            continue

        try:
            p = psutil.Process(pid)
            if p.exe() == sys.executable:
                p.send_signal(signal.CTRL_C_EVENT)
        except:
            pass
FTApiDaemon.py 文件源码 项目:futuquant 作者: FutunnOpen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_process_by_path(self, path):
        """????????"""
        lower_path = str(path).lower()
        for pid in psutil.pids():
            try:
                process = psutil.Process(pid)
                tmp = process.exe()
                if str(tmp).lower() == lower_path:
                    return process
            except:
                continue
        return None
test_memory_leaks.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pids(self):
        self.execute(psutil.pids)

    # --- net
test_misc.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_wait_for_pid(self):
        wait_for_pid(os.getpid())
        nopid = max(psutil.pids()) + 99999
        with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
            self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_pid_0(self):
        # Process(0) is supposed to work on all platforms except Linux
        if 0 not in psutil.pids():
            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
            return

        # test all methods
        p = psutil.Process(0)
        for name in psutil._as_dict_attrnames:
            if name == 'pid':
                continue
            meth = getattr(p, name)
            try:
                ret = meth()
            except psutil.AccessDenied:
                pass
            else:
                if name in ("uids", "gids"):
                    self.assertEqual(ret.real, 0)
                elif name == "username":
                    if POSIX:
                        self.assertEqual(p.username(), 'root')
                    elif WINDOWS:
                        self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
                elif name == "name":
                    assert name, name

        if hasattr(p, 'rlimit'):
            try:
                p.rlimit(psutil.RLIMIT_FSIZE)
            except psutil.AccessDenied:
                pass

        p.as_dict()

        if not OPENBSD:
            self.assertIn(0, psutil.pids())
            self.assertTrue(psutil.pid_exists(0))


问题


面经


文章

微信
公众号

扫码关注公众号