def run(self):
time.sleep(10) # wait after launch before beginning to check for PID
while psutil.pid_exists(self.masterPID):
time.sleep(2)
sys.stderr.write("\n\n----\nlooks like python session that owned this instance of the " +
"ZTV gui is gone, so disposing of the window\n----\n")
wx.CallAfter(pub.sendMessage, 'kill-ztv', msg=None)
python类pid_exists()的实例源码
def run(self):
time.sleep(10) # wait after launch before beginning to check for PID
while psutil.pid_exists(self.masterPID):
time.sleep(2)
sys.stderr.write("\n\n----\nlooks like python session that owned this instance of the " +
"ZTV gui is gone, so disposing of the window\n----\n")
wx.CallAfter(pub.sendMessage, 'kill-ztv', msg=None)
def is_running():
try:
if System.Ahenk.get_pid_number() is not None:
return psutil.pid_exists(int(System.Ahenk.get_pid_number()))
else:
return False
except Exception as e:
return False
def is_running(pid):
return psutil.pid_exists(pid)
def _cleanup_slots(self):
for i, pid in enumerate(self._slots):
if pid != 0:
if not psutil.pid_exists(pid):
self._slots[i] = 0
# Target is expected to return True if loop shall continue and False if it
# should break.
def check_sh_is_running(pidfile):
"""
This method deamonizes the sh.py process and redirects standard file descriptors.
:param pidfile: Name of the pidfile to check
:type pidfile: str
:return: True: if SmartHomeNG is running, False: if SmartHome is not running
:rtype: bool
"""
pid = read_pidfile(pidfile)
return psutil.pid_exists(pid) if pid > 0 else False
def test_send(self):
''' Test sending signals.
'''
python_path = sys.executable
python_path = os.path.abspath(python_path)
worker_cmd = ('"' + python_path + '" -c ' +
'"import time; time.sleep(60)"')
with tempfile.TemporaryDirectory() as dirpath:
pidfile = dirpath + '/pid.pid'
for sig in [2, signal.SIGTERM, SIGABRT]:
with self.subTest(sig):
worker = subprocess.Popen(
worker_cmd
)
worker_pid = worker.pid
with open(pidfile, 'w') as f:
f.write(str(worker_pid) + '\n')
send(pidfile, sig)
worker.wait()
self.assertEqual(worker.returncode, int(sig))
# Get a false PID so we can test against it as well
# Note the mild race condition here
bad_pid = os.getpid()
while psutil.pid_exists(bad_pid):
bad_pid = random.randint(1000, 99999)
with open(pidfile, 'w') as f:
f.write(str(bad_pid) + '\n')
with self.assertRaises(OSError):
send(pidfile, sig)
def test_pid_exists(self):
self.execute(psutil.pid_exists, os.getpid())
# --- disk
def test_kill(self):
sproc = get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.kill()
sig = p.wait()
self.assertFalse(psutil.pid_exists(test_pid))
if POSIX:
self.assertEqual(sig, -signal.SIGKILL)
def test_terminate(self):
sproc = get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.terminate()
sig = p.wait()
self.assertFalse(psutil.pid_exists(test_pid))
if POSIX:
self.assertEqual(sig, -signal.SIGTERM)
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
exit_sig = p.wait()
self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(exit_sig, -sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
with self.assertRaises(psutil.AccessDenied):
psutil.Process().send_signal(sig)
# Sending a signal to process with PID 0 is not allowed as
# it would affect every process in the process group of
# the calling process (os.getpid()) instead of PID 0").
if 0 in psutil.pids():
p = psutil.Process(0)
self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def test_pid_exists(self):
sproc = get_test_subprocess()
self.assertTrue(psutil.pid_exists(sproc.pid))
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertFalse(psutil.pid_exists(sproc.pid))
self.assertFalse(psutil.pid_exists(-1))
self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
# pid 0
psutil.pid_exists(0) == 0 in psutil.pids()
def test_pid_exists_2(self):
reap_children()
pids = psutil.pids()
for pid in pids:
try:
assert psutil.pid_exists(pid)
except AssertionError:
# in case the process disappeared in meantime fail only
# if it is no longer in psutil.pids()
time.sleep(.1)
if pid in psutil.pids():
self.fail(pid)
pids = range(max(pids) + 5000, max(pids) + 6000)
for pid in pids:
self.assertFalse(psutil.pid_exists(pid), msg=pid)
def test_pid_exists(self):
self.execute(psutil.pid_exists, os.getpid())
# --- disk
def test_kill(self):
sproc = get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.kill()
sig = p.wait()
self.assertFalse(psutil.pid_exists(test_pid))
if POSIX:
self.assertEqual(sig, -signal.SIGKILL)
def test_terminate(self):
sproc = get_test_subprocess()
test_pid = sproc.pid
p = psutil.Process(test_pid)
p.terminate()
sig = p.wait()
self.assertFalse(psutil.pid_exists(test_pid))
if POSIX:
self.assertEqual(sig, -signal.SIGTERM)
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
exit_sig = p.wait()
self.assertFalse(psutil.pid_exists(p.pid))
if POSIX:
self.assertEqual(exit_sig, -sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
with self.assertRaises(psutil.NoSuchProcess):
p.send_signal(sig)
#
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.send_signal(sig)
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
with self.assertRaises(psutil.AccessDenied):
psutil.Process().send_signal(sig)
# Sending a signal to process with PID 0 is not allowed as
# it would affect every process in the process group of
# the calling process (os.getpid()) instead of PID 0").
if 0 in psutil.pids():
p = psutil.Process(0)
self.assertRaises(ValueError, p.send_signal, signal.SIGTERM)
def test_pid_exists(self):
sproc = get_test_subprocess()
self.assertTrue(psutil.pid_exists(sproc.pid))
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertFalse(psutil.pid_exists(sproc.pid))
self.assertFalse(psutil.pid_exists(-1))
self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
# pid 0
psutil.pid_exists(0) == 0 in psutil.pids()
def test_pid_exists_2(self):
reap_children()
pids = psutil.pids()
for pid in pids:
try:
assert psutil.pid_exists(pid)
except AssertionError:
# in case the process disappeared in meantime fail only
# if it is no longer in psutil.pids()
time.sleep(.1)
if pid in psutil.pids():
self.fail(pid)
pids = range(max(pids) + 5000, max(pids) + 6000)
for pid in pids:
self.assertFalse(psutil.pid_exists(pid), msg=pid)
def _wait_and_kill(pid_to_wait, pids_to_kill):
""" Wait for a process to finish if it exists, and then kill a list of processes.
Args:
pid_to_wait: the process to wait for.
pids_to_kill: a list of processes to kill after the process of pid_to_wait finishes.
"""
if psutil.pid_exists(pid_to_wait):
psutil.Process(pid=pid_to_wait).wait()
for pid_to_kill in pids_to_kill:
if psutil.pid_exists(pid_to_kill):
p = psutil.Process(pid=pid_to_kill)
p.kill()