def test_wrapper_termination():
progression.log.setLevel(logging.DEBUG)
shared_pid = progression.UnsignedIntValue()
p = mp.Process(target = f_wrapper_termination, args = (shared_pid, ))
p.start()
time.sleep(2)
p.terminate()
p.join(5)
pid = shared_pid.value
if pid != 0:
if psutil.pid_exists(pid):
p = psutil.Process(pid)
while p.is_running():
print("pid {} is still running, sigkill".format(pid))
p.send_signal(signal.SIGKILL)
time.sleep(0.1)
print("pid {} has stopped now".format(pid))
assert False, "the loop process was still running!"
评论列表
文章目录