def murder_workers(self):
"""\
Kill unused/idle workers
"""
if not self.timeout:
return
workers = list(self.WORKERS.items())
for (pid, worker) in workers:
try:
if time.time() - worker.tmp.last_update() <= self.timeout:
continue
except (OSError, ValueError):
continue
if not worker.aborted:
self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
worker.aborted = True
self.kill_worker(pid, signal.SIGABRT)
else:
self.kill_worker(pid, signal.SIGKILL)
python类SIGABRT的实例源码
def murder_workers(self):
"""\
Kill unused/idle workers
"""
if not self.timeout:
return
workers = list(self.WORKERS.items())
for (pid, worker) in workers:
try:
if time.time() - worker.tmp.last_update() <= self.timeout:
continue
except (OSError, ValueError):
continue
if not worker.aborted:
self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
worker.aborted = True
self.kill_worker(pid, signal.SIGABRT)
else:
self.kill_worker(pid, signal.SIGKILL)
def init_signals(self):
# Set up signals through the event loop API.
self.loop.add_signal_handler(signal.SIGQUIT, self.handle_quit,
signal.SIGQUIT, None)
self.loop.add_signal_handler(signal.SIGTERM, self.handle_exit,
signal.SIGTERM, None)
self.loop.add_signal_handler(signal.SIGINT, self.handle_quit,
signal.SIGINT, None)
self.loop.add_signal_handler(signal.SIGWINCH, self.handle_winch,
signal.SIGWINCH, None)
self.loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1,
signal.SIGUSR1, None)
self.loop.add_signal_handler(signal.SIGABRT, self.handle_abort,
signal.SIGABRT, None)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
def display(self):
error("Program received signal SIGABRT, Aborted.")
def _analyze(self):
if self.signum == SIGSEGV:
self.memoryFault()
elif self.signum == SIGFPE:
self.mathError()
elif self.signum == SIGCHLD:
self.childExit()
elif self.signum == SIGABRT:
self.error = Abort()
return self.error
def init_signals(self):
# Set up signals through the event loop API.
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGQUIT, self.handle_quit,
signal.SIGQUIT, None)
loop.add_signal_handler(signal.SIGTERM, self.handle_exit,
signal.SIGTERM, None)
loop.add_signal_handler(signal.SIGINT, self.handle_quit,
signal.SIGINT, None)
loop.add_signal_handler(signal.SIGWINCH, self.handle_winch,
signal.SIGWINCH, None)
loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1,
signal.SIGUSR1, None)
loop.add_signal_handler(signal.SIGABRT, self.handle_abort,
signal.SIGABRT, None)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
def setUp(self):
self.sigint_fired = False
self.sigabrt_fired = False
signal.signal(signal.SIGABRT, self.sigabrt_handler)
signal.signal(signal.SIGINT, self.sigint_handler)
def test_no_int_case_local_killer(self):
test_process_obj = TestProcess()
testing_process = multiprocessing.Process(
target=test_process_obj.no_int_sleep,
kwargs={
'interval': 30,
},
)
testing_process.daemon = True
testing_process.start()
self.assertTrue(testing_process.is_alive())
local_killer = devices.killer.LocalKiller(
pid=testing_process.pid,
soft_timeout=1.0,
soft_timeout_signal=signal.SIGINT,
hard_timeout=2.0,
hard_timeout_signal=signal.SIGABRT,
critical_timeout=5.0,
critical_timeout_signal=signal.SIGTERM,
memory_limit=4 * 1024 * 1024 * 1024,
memory_limit_signal=signal.SIGINT,
)
local_killer.start()
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertFalse(testing_process.is_alive())
self.assertEqual(testing_process.exitcode, 10)
local_killer.stop()
def test_sleep_case_remote_killer(self):
test_process_obj = TestProcess()
testing_process = multiprocessing.Process(
target=test_process_obj.sleep,
kwargs={
'interval': 30,
},
)
testing_process.daemon = True
testing_process.start()
self.assertTrue(testing_process.is_alive())
remote_killer = devices.killer.RemoteKiller(
pid=testing_process.pid,
soft_timeout=1.0,
soft_timeout_signal=signal.SIGINT,
hard_timeout=2.0,
hard_timeout_signal=signal.SIGABRT,
critical_timeout=5.0,
critical_timeout_signal=signal.SIGTERM,
memory_limit=4 * 1024 * 1024 * 1024,
memory_limit_signal=signal.SIGINT,
)
remote_killer.start()
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertFalse(testing_process.is_alive())
self.assertEqual(testing_process.exitcode, 20)
remote_killer.stop()
def test_no_int_case_remote_killer(self):
test_process_obj = TestProcess()
testing_process = multiprocessing.Process(
target=test_process_obj.no_int_sleep,
kwargs={
'interval': 30,
},
)
testing_process.daemon = True
testing_process.start()
self.assertTrue(testing_process.is_alive())
remote_killer = devices.killer.RemoteKiller(
pid=testing_process.pid,
soft_timeout=1.0,
soft_timeout_signal=signal.SIGINT,
hard_timeout=2.0,
hard_timeout_signal=signal.SIGABRT,
critical_timeout=5.0,
critical_timeout_signal=signal.SIGTERM,
memory_limit=4 * 1024 * 1024 * 1024,
memory_limit_signal=signal.SIGINT,
)
remote_killer.start()
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertFalse(testing_process.is_alive())
self.assertEqual(testing_process.exitcode, 10)
remote_killer.stop()
def test_lost_case_remote_killer(self):
test_process_obj = TestProcess()
testing_process = multiprocessing.Process(
target=test_process_obj.lost,
kwargs={
'interval': 30,
},
)
testing_process.daemon = True
testing_process.start()
self.assertTrue(testing_process.is_alive())
remote_killer = devices.killer.RemoteKiller(
pid=testing_process.pid,
soft_timeout=1.0,
soft_timeout_signal=signal.SIGINT,
hard_timeout=2.0,
hard_timeout_signal=signal.SIGABRT,
critical_timeout=3.0,
critical_timeout_signal=signal.SIGTERM,
memory_limit=4 * 1024 * 1024 * 1024,
memory_limit_signal=signal.SIGINT,
)
remote_killer.start()
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertTrue(testing_process.is_alive())
time.sleep(1.2)
self.assertFalse(testing_process.is_alive())
self.assertEqual(testing_process.exitcode, -15)
remote_killer.stop()
def init(self):
'''
'''
signal.signal(signal.SIGABRT, self.sigabrt_handler)
signal.signal(signal.SIGINT, self.sigint_handler)
def end_working(
self,
):
if self.tasks_to_finish:
self.worker_task_queue.apply_async_many(
tasks=self.tasks_to_finish,
priority='NORMAL',
)
signal.signal(signal.SIGABRT, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.killer.stop()
def test_handle_logging_signal_other(self):
self.cls.log_enabled = True
with patch('%s.logger' % pbm) as mock_logger:
with patch('%s.getpid' % pbm) as mock_getpid:
mock_getpid.return_value = 12345
self.cls.handle_logging_signal(signal.SIGABRT, None)
assert mock_logger.mock_calls == []
assert self.cls.log_enabled is True
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def test_run_abort(self):
# returncode handles signal termination
with _SuppressCoreFiles():
p = subprocess.Popen([sys.executable, "-c",
'import os; os.abort()'])
p.wait()
self.assertEqual(-p.returncode, signal.SIGABRT)
def setup():
for sig in (signal.SIGABRT, signal.SIGILL, signal.SIGINT, signal.SIGSEGV, signal.SIGTERM):
signal.signal(sig, cleanup)
def _test_bug_551(self, query):
script = ("""\
import os
import sys
import time
import signal
import threading
import psycopg2
def handle_sigabort(sig, frame):
sys.exit(1)
def killer():
time.sleep(0.5)
os.kill(os.getpid(), signal.SIGABRT)
signal.signal(signal.SIGABRT, handle_sigabort)
conn = psycopg2.connect(%(dsn)r)
cur = conn.cursor()
cur.execute("create table test551 (id serial, num varchar(50))")
t = threading.Thread(target=killer)
t.daemon = True
t.start()
while True:
cur.execute(%(query)r, ("Hello, world!",))
""" % {'dsn': dsn, 'query': query})
proc = sp.Popen([sys.executable, '-c', script_to_py3(script)],
stdout=sp.PIPE, stderr=sp.PIPE)
(out, err) = proc.communicate()
self.assertNotEqual(proc.returncode, 0)
# Strip [NNN refs] from output
err = re.sub(br'\[[^\]]+\]', b'', err).strip()
self.assertTrue(not err, err)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def test_run_abort(self):
# returncode handles signal termination
with _SuppressCoreFiles():
p = subprocess.Popen([sys.executable, "-c",
"import os; os.abort()"])
p.wait()
self.assertEqual(-p.returncode, signal.SIGABRT)