python类pid_exists()的实例源码

ztv.py 文件源码 项目:CAAPR 作者: Stargrazer82301 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self):
        time.sleep(10)  # wait after launch before beginning to check for PID
        while psutil.pid_exists(self.masterPID):
            time.sleep(2)
        sys.stderr.write("\n\n----\nlooks like python session that owned this instance of the " +
                         "ZTV gui is gone, so disposing of the window\n----\n")
        wx.CallAfter(pub.sendMessage, 'kill-ztv', msg=None)
ztv.py 文件源码 项目:CAAPR 作者: Stargrazer82301 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self):
        time.sleep(10)  # wait after launch before beginning to check for PID
        while psutil.pid_exists(self.masterPID):
            time.sleep(2)
        sys.stderr.write("\n\n----\nlooks like python session that owned this instance of the " +
                         "ZTV gui is gone, so disposing of the window\n----\n")
        wx.CallAfter(pub.sendMessage, 'kill-ztv', msg=None)
system.py 文件源码 项目:ahenk 作者: Pardus-LiderAhenk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def is_running():
            try:
                if System.Ahenk.get_pid_number() is not None:
                    return psutil.pid_exists(int(System.Ahenk.get_pid_number()))
                else:
                    return False
            except Exception as e:
                return False
system.py 文件源码 项目:ahenk 作者: Pardus-LiderAhenk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def is_running(pid):
            return psutil.pid_exists(pid)
parallel_loop.py 文件源码 项目:picire 作者: renatahodovan 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _cleanup_slots(self):
        for i, pid in enumerate(self._slots):
            if pid != 0:
                if not psutil.pid_exists(pid):
                    self._slots[i] = 0

    # Target is expected to return True if loop shall continue and False if it
    # should break.
daemon.py 文件源码 项目:smarthome 作者: smarthomeNG 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def check_sh_is_running(pidfile):
    """
    This method deamonizes the sh.py process and redirects standard file descriptors.

    :param pidfile: Name of the pidfile to check
    :type pidfile: str

    :return: True: if SmartHomeNG is running, False: if SmartHome is not running
    :rtype: bool
    """

    pid = read_pidfile(pidfile)
    return psutil.pid_exists(pid) if pid > 0 else False
test_signals_windows.py 文件源码 项目:py_daemoniker 作者: Muterra 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_send(self):
        ''' Test sending signals.
        '''
        python_path = sys.executable
        python_path = os.path.abspath(python_path)
        worker_cmd = ('"' + python_path + '" -c ' +
                      '"import time; time.sleep(60)"')

        with tempfile.TemporaryDirectory() as dirpath:
            pidfile = dirpath + '/pid.pid'

            for sig in [2, signal.SIGTERM, SIGABRT]:
                with self.subTest(sig):
                    worker = subprocess.Popen(
                        worker_cmd
                    )
                    worker_pid = worker.pid

                    with open(pidfile, 'w') as f:
                        f.write(str(worker_pid) + '\n')

                    send(pidfile, sig)
                    worker.wait()
                    self.assertEqual(worker.returncode, int(sig))

                    # Get a false PID so we can test against it as well
                    # Note the mild race condition here
                    bad_pid = os.getpid()
                    while psutil.pid_exists(bad_pid):
                        bad_pid = random.randint(1000, 99999)

                    with open(pidfile, 'w') as f:
                        f.write(str(bad_pid) + '\n')

                    with self.assertRaises(OSError):
                        send(pidfile, sig)
test_memory_leaks.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_pid_exists(self):
        self.execute(psutil.pid_exists, os.getpid())

    # --- disk
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGKILL)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGTERM)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
test_system.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_pid_exists(self):
        sproc = get_test_subprocess()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
        # pid 0
        psutil.pid_exists(0) == 0 in psutil.pids()
test_system.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
test_memory_leaks.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_pid_exists(self):
        self.execute(psutil.pid_exists, os.getpid())

    # --- disk
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_kill(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.kill()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGKILL)
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_terminate(self):
        sproc = get_test_subprocess()
        test_pid = sproc.pid
        p = psutil.Process(test_pid)
        p.terminate()
        sig = p.wait()
        self.assertFalse(psutil.pid_exists(test_pid))
        if POSIX:
            self.assertEqual(sig, -signal.SIGTERM)
test_process.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        sproc = get_test_subprocess()
        p = psutil.Process(sproc.pid)
        p.send_signal(sig)
        exit_sig = p.wait()
        self.assertFalse(psutil.pid_exists(p.pid))
        if POSIX:
            self.assertEqual(exit_sig, -sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.ESRCH, "")):
                with self.assertRaises(psutil.NoSuchProcess):
                    p.send_signal(sig)
            #
            sproc = get_test_subprocess()
            p = psutil.Process(sproc.pid)
            p.send_signal(sig)
            with mock.patch('psutil.os.kill',
                            side_effect=OSError(errno.EPERM, "")):
                with self.assertRaises(psutil.AccessDenied):
                    psutil.Process().send_signal(sig)
            # Sending a signal to process with PID 0 is not allowed as
            # it would affect every process in the process group of
            # the calling process (os.getpid()) instead of PID 0").
            if 0 in psutil.pids():
                p = psutil.Process(0)
                self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
test_system.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_pid_exists(self):
        sproc = get_test_subprocess()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
        # pid 0
        psutil.pid_exists(0) == 0 in psutil.pids()
test_system.py 文件源码 项目:FancyWord 作者: EastonLee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pid_exists_2(self):
        reap_children()
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                if pid in psutil.pids():
                    self.fail(pid)
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)
_shell_process.py 文件源码 项目:pydatalab 作者: googledatalab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _wait_and_kill(pid_to_wait, pids_to_kill):
  """ Wait for a process to finish if it exists, and then kill a list of processes.

  Args:
    pid_to_wait: the process to wait for.
    pids_to_kill: a list of processes to kill after the process of pid_to_wait finishes.
  """
  if psutil.pid_exists(pid_to_wait):
    psutil.Process(pid=pid_to_wait).wait()

  for pid_to_kill in pids_to_kill:
    if psutil.pid_exists(pid_to_kill):
      p = psutil.Process(pid=pid_to_kill)
      p.kill()


问题


面经


文章

微信
公众号

扫码关注公众号