def test_send(self):
''' Test sending signals.
'''
python_path = sys.executable
python_path = os.path.abspath(python_path)
worker_cmd = ('"' + python_path + '" -c ' +
'"import time; time.sleep(60)"')
with tempfile.TemporaryDirectory() as dirpath:
pidfile = dirpath + '/pid.pid'
for sig in [2, signal.SIGTERM, SIGABRT]:
with self.subTest(sig):
worker = subprocess.Popen(
worker_cmd
)
worker_pid = worker.pid
with open(pidfile, 'w') as f:
f.write(str(worker_pid) + '\n')
send(pidfile, sig)
worker.wait()
self.assertEqual(worker.returncode, int(sig))
# Get a false PID so we can test against it as well
# Note the mild race condition here
bad_pid = os.getpid()
while psutil.pid_exists(bad_pid):
bad_pid = random.randint(1000, 99999)
with open(pidfile, 'w') as f:
f.write(str(bad_pid) + '\n')
with self.assertRaises(OSError):
send(pidfile, sig)
评论列表
文章目录