def test_kill_process_tree(self):
"""Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway."""
child_process_killed = multiprocessing.Value('i', 0)
process_done = multiprocessing.Semaphore(0)
child_pid = multiprocessing.Value('i', 0)
setup_done = multiprocessing.Semaphore(0)
args = [child_process_killed, child_pid, process_done, setup_done]
child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args)
try:
child.start()
self.assertTrue(process_done.acquire(timeout=5.0))
self.assertEqual(1, child_process_killed.value)
finally:
try:
os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here
except OSError:
pass
评论列表
文章目录