python类SIGABRT的实例源码

arbiter.py 文件源码 项目:Lixiang_zhaoxin 作者: hejaxian 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def murder_workers(self):
        """\
        Kill unused/idle workers
        """
        if not self.timeout:
            return
        workers = list(self.WORKERS.items())
        for (pid, worker) in workers:
            try:
                if time.time() - worker.tmp.last_update() <= self.timeout:
                    continue
            except (OSError, ValueError):
                continue

            if not worker.aborted:
                self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
                worker.aborted = True
                self.kill_worker(pid, signal.SIGABRT)
            else:
                self.kill_worker(pid, signal.SIGKILL)
arbiter.py 文件源码 项目:Alfred 作者: jkachhadia 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def murder_workers(self):
        """\
        Kill unused/idle workers
        """
        if not self.timeout:
            return
        workers = list(self.WORKERS.items())
        for (pid, worker) in workers:
            try:
                if time.time() - worker.tmp.last_update() <= self.timeout:
                    continue
            except (OSError, ValueError):
                continue

            if not worker.aborted:
                self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
                worker.aborted = True
                self.kill_worker(pid, signal.SIGABRT)
            else:
                self.kill_worker(pid, signal.SIGKILL)
sanic_worker.py 文件源码 项目:guvnor 作者: jeamland 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def init_signals(self):
        # Set up signals through the event loop API.

        self.loop.add_signal_handler(signal.SIGQUIT, self.handle_quit,
                                     signal.SIGQUIT, None)

        self.loop.add_signal_handler(signal.SIGTERM, self.handle_exit,
                                     signal.SIGTERM, None)

        self.loop.add_signal_handler(signal.SIGINT, self.handle_quit,
                                     signal.SIGINT, None)

        self.loop.add_signal_handler(signal.SIGWINCH, self.handle_winch,
                                     signal.SIGWINCH, None)

        self.loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1,
                                     signal.SIGUSR1, None)

        self.loop.add_signal_handler(signal.SIGABRT, self.handle_abort,
                                     signal.SIGABRT, None)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        signal.siginterrupt(signal.SIGTERM, False)
        signal.siginterrupt(signal.SIGUSR1, False)
ptrace_signal.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def display(self):
        error("Program received signal SIGABRT, Aborted.")
ptrace_signal.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _analyze(self):
        if self.signum == SIGSEGV:
            self.memoryFault()
        elif self.signum == SIGFPE:
            self.mathError()
        elif self.signum == SIGCHLD:
            self.childExit()
        elif self.signum == SIGABRT:
            self.error = Abort()
        return self.error
worker.py 文件源码 项目:uvicorn 作者: encode 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def init_signals(self):
        # Set up signals through the event loop API.
        loop = asyncio.get_event_loop()

        loop.add_signal_handler(signal.SIGQUIT, self.handle_quit,
                                signal.SIGQUIT, None)

        loop.add_signal_handler(signal.SIGTERM, self.handle_exit,
                                signal.SIGTERM, None)

        loop.add_signal_handler(signal.SIGINT, self.handle_quit,
                                signal.SIGINT, None)

        loop.add_signal_handler(signal.SIGWINCH, self.handle_winch,
                                signal.SIGWINCH, None)

        loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1,
                                signal.SIGUSR1, None)

        loop.add_signal_handler(signal.SIGABRT, self.handle_abort,
                                signal.SIGABRT, None)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        signal.siginterrupt(signal.SIGTERM, False)
        signal.siginterrupt(signal.SIGUSR1, False)
test_devices.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        self.sigint_fired = False
        self.sigabrt_fired = False

        signal.signal(signal.SIGABRT, self.sigabrt_handler)
        signal.signal(signal.SIGINT, self.sigint_handler)
test_devices.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_no_int_case_local_killer(self):
        test_process_obj = TestProcess()
        testing_process = multiprocessing.Process(
            target=test_process_obj.no_int_sleep,
            kwargs={
                'interval': 30,
            },
        )
        testing_process.daemon = True
        testing_process.start()

        self.assertTrue(testing_process.is_alive())

        local_killer = devices.killer.LocalKiller(
            pid=testing_process.pid,
            soft_timeout=1.0,
            soft_timeout_signal=signal.SIGINT,
            hard_timeout=2.0,
            hard_timeout_signal=signal.SIGABRT,
            critical_timeout=5.0,
            critical_timeout_signal=signal.SIGTERM,
            memory_limit=4 * 1024 * 1024 * 1024,
            memory_limit_signal=signal.SIGINT,
        )

        local_killer.start()
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertFalse(testing_process.is_alive())
        self.assertEqual(testing_process.exitcode, 10)
        local_killer.stop()
test_devices.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_sleep_case_remote_killer(self):
        test_process_obj = TestProcess()
        testing_process = multiprocessing.Process(
            target=test_process_obj.sleep,
            kwargs={
                'interval': 30,
            },
        )
        testing_process.daemon = True
        testing_process.start()

        self.assertTrue(testing_process.is_alive())

        remote_killer = devices.killer.RemoteKiller(
            pid=testing_process.pid,
            soft_timeout=1.0,
            soft_timeout_signal=signal.SIGINT,
            hard_timeout=2.0,
            hard_timeout_signal=signal.SIGABRT,
            critical_timeout=5.0,
            critical_timeout_signal=signal.SIGTERM,
            memory_limit=4 * 1024 * 1024 * 1024,
            memory_limit_signal=signal.SIGINT,
        )

        remote_killer.start()
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertFalse(testing_process.is_alive())
        self.assertEqual(testing_process.exitcode, 20)
        remote_killer.stop()
test_devices.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_no_int_case_remote_killer(self):
        test_process_obj = TestProcess()
        testing_process = multiprocessing.Process(
            target=test_process_obj.no_int_sleep,
            kwargs={
                'interval': 30,
            },
        )
        testing_process.daemon = True
        testing_process.start()

        self.assertTrue(testing_process.is_alive())

        remote_killer = devices.killer.RemoteKiller(
            pid=testing_process.pid,
            soft_timeout=1.0,
            soft_timeout_signal=signal.SIGINT,
            hard_timeout=2.0,
            hard_timeout_signal=signal.SIGABRT,
            critical_timeout=5.0,
            critical_timeout_signal=signal.SIGTERM,
            memory_limit=4 * 1024 * 1024 * 1024,
            memory_limit_signal=signal.SIGINT,
        )

        remote_killer.start()
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertFalse(testing_process.is_alive())
        self.assertEqual(testing_process.exitcode, 10)
        remote_killer.stop()
test_devices.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_lost_case_remote_killer(self):
        test_process_obj = TestProcess()
        testing_process = multiprocessing.Process(
            target=test_process_obj.lost,
            kwargs={
                'interval': 30,
            },
        )
        testing_process.daemon = True
        testing_process.start()

        self.assertTrue(testing_process.is_alive())

        remote_killer = devices.killer.RemoteKiller(
            pid=testing_process.pid,
            soft_timeout=1.0,
            soft_timeout_signal=signal.SIGINT,
            hard_timeout=2.0,
            hard_timeout_signal=signal.SIGABRT,
            critical_timeout=3.0,
            critical_timeout_signal=signal.SIGTERM,
            memory_limit=4 * 1024 * 1024 * 1024,
            memory_limit_signal=signal.SIGINT,
        )

        remote_killer.start()
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertTrue(testing_process.is_alive())
        time.sleep(1.2)
        self.assertFalse(testing_process.is_alive())
        self.assertEqual(testing_process.exitcode, -15)
        remote_killer.stop()
test_devices.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init(self):
        '''
        '''
        signal.signal(signal.SIGABRT, self.sigabrt_handler)
        signal.signal(signal.SIGINT, self.sigint_handler)
serial.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def end_working(
        self,
    ):
        if self.tasks_to_finish:
            self.worker_task_queue.apply_async_many(
                tasks=self.tasks_to_finish,
                priority='NORMAL',
            )

        signal.signal(signal.SIGABRT, signal.SIG_DFL)
        signal.signal(signal.SIGINT, signal.SIG_DFL)

        self.killer.stop()
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_handle_logging_signal_other(self):
        self.cls.log_enabled = True
        with patch('%s.logger' % pbm) as mock_logger:
            with patch('%s.getpid' % pbm) as mock_getpid:
                mock_getpid.return_value = 12345
                self.cls.handle_logging_signal(signal.SIGABRT, None)
        assert mock_logger.mock_calls == []
        assert self.cls.log_enabled is True
test_signal.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_issue9324(self):
        # Updated for issue #10003, adding SIGBREAK
        handler = lambda x, y: None
        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
                    signal.SIGTERM):
            # Set and then reset a handler for signals that work on windows
            signal.signal(sig, signal.signal(sig, handler))

        with self.assertRaises(ValueError):
            signal.signal(-1, handler)

        with self.assertRaises(ValueError):
            signal.signal(7, handler)
test_subprocess.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_run_abort(self):
        # returncode handles signal termination
        with _SuppressCoreFiles():
            p = subprocess.Popen([sys.executable, "-c",
                                  'import os; os.abort()'])
            p.wait()
        self.assertEqual(-p.returncode, signal.SIGABRT)
voicetrigger.py 文件源码 项目:AlexaBeagleBone2 作者: merdahl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setup():
    for sig in (signal.SIGABRT, signal.SIGILL, signal.SIGINT, signal.SIGSEGV, signal.SIGTERM):
        signal.signal(sig, cleanup)
test_connection.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _test_bug_551(self, query):
        script = ("""\
import os
import sys
import time
import signal
import threading

import psycopg2

def handle_sigabort(sig, frame):
    sys.exit(1)

def killer():
    time.sleep(0.5)
    os.kill(os.getpid(), signal.SIGABRT)

signal.signal(signal.SIGABRT, handle_sigabort)

conn = psycopg2.connect(%(dsn)r)

cur = conn.cursor()

cur.execute("create table test551 (id serial, num varchar(50))")

t = threading.Thread(target=killer)
t.daemon = True
t.start()

while True:
    cur.execute(%(query)r, ("Hello, world!",))
""" % {'dsn': dsn, 'query': query})

        proc = sp.Popen([sys.executable, '-c', script_to_py3(script)],
            stdout=sp.PIPE, stderr=sp.PIPE)
        (out, err) = proc.communicate()
        self.assertNotEqual(proc.returncode, 0)
        # Strip [NNN refs] from output
        err = re.sub(br'\[[^\]]+\]', b'', err).strip()
        self.assertTrue(not err, err)
test_signal.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_issue9324(self):
        # Updated for issue #10003, adding SIGBREAK
        handler = lambda x, y: None
        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
                    signal.SIGTERM):
            # Set and then reset a handler for signals that work on windows
            signal.signal(sig, signal.signal(sig, handler))

        with self.assertRaises(ValueError):
            signal.signal(-1, handler)

        with self.assertRaises(ValueError):
            signal.signal(7, handler)
test_subprocess.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_run_abort(self):
        # returncode handles signal termination
        with _SuppressCoreFiles():
            p = subprocess.Popen([sys.executable, "-c",
                                  "import os; os.abort()"])
            p.wait()
        self.assertEqual(-p.returncode, signal.SIGABRT)


问题


面经


文章

微信
公众号

扫码关注公众号