python类SIGALRM的实例源码

test_signal.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_sigwaitinfo_interrupted(self):
        self.wait_helper(signal.SIGUSR1, '''
        def test(signum):
            import errno

            hndl_called = True
            def alarm_handler(signum, frame):
                hndl_called = False

            signal.signal(signal.SIGALRM, alarm_handler)
            signal.alarm(1)
            try:
                signal.sigwaitinfo([signal.SIGUSR1])
            except OSError as e:
                if e.errno == errno.EINTR:
                    if not hndl_called:
                        raise Exception("SIGALRM handler not called")
                else:
                    raise Exception("Expected EINTR to be raised by sigwaitinfo")
            else:
                raise Exception("Expected EINTR to be raised by sigwaitinfo")
        ''')
test_io.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def check_reentrant_write(self, data, **fdopen_kwargs):
        def on_alarm(*args):
            # Will be called reentrantly from the same thread
            wio.write(data)
            1/0
        signal.signal(signal.SIGALRM, on_alarm)
        r, w = os.pipe()
        wio = self.io.open(w, **fdopen_kwargs)
        try:
            signal.alarm(1)
            # Either the reentrant call to wio.write() fails with RuntimeError,
            # or the signal handler raises ZeroDivisionError.
            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
                while 1:
                    for i in range(100):
                        wio.write(data)
                        wio.flush()
                    # Make sure the buffer doesn't fill up and block further writes
                    os.read(r, len(data) * 100)
            exc = cm.exception
            if isinstance(exc, RuntimeError):
                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
        finally:
            wio.close()
            os.close(r)
test_io.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
        """Check that a buffered read, when it gets interrupted (either
        returning a partial result or EINTR), properly invokes the signal
        handler and retries if the latter returned successfully."""
        r, w = os.pipe()
        fdopen_kwargs["closefd"] = False
        def alarm_handler(sig, frame):
            os.write(w, b"bar")
        signal.signal(signal.SIGALRM, alarm_handler)
        try:
            rio = self.io.open(r, **fdopen_kwargs)
            os.write(w, b"foo")
            signal.alarm(1)
            # Expected behaviour:
            # - first raw read() returns partial b"foo"
            # - second raw read() returns EINTR
            # - third raw read() returns b"bar"
            self.assertEqual(decode(rio.read(6)), "foobar")
        finally:
            rio.close()
            os.close(w)
            os.close(r)
test_selectors.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_select_interrupt(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
        self.addCleanup(signal.alarm, 0)

        signal.alarm(1)

        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(2))
        self.assertLess(time() - t, 2.5)
collectors.py 文件源码 项目:Hawkeye 作者: tozhengxq 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, child, timelimit, greedy=False, use_alarm=True):
        """
        :param child: the collector to wrap.
        :param timelimit: the maximum amount of time (in seconds) to
            allow for searching. If the search takes longer than this, it will
            raise a ``TimeLimit`` exception.
        :param greedy: if ``True``, the collector will finish adding the most
            recent hit before raising the ``TimeLimit`` exception.
        :param use_alarm: if ``True`` (the default), the collector will try to
            use signal.SIGALRM (on UNIX).
        """
        self.child = child
        self.timelimit = timelimit
        self.greedy = greedy

        if use_alarm:
            import signal
            self.use_alarm = use_alarm and hasattr(signal, "SIGALRM")
        else:
            self.use_alarm = False

        self.timer = None
        self.timedout = False
__init__.py 文件源码 项目:deb-python-cotyledon 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, wait_interval=0.01):
        """Creates the ServiceManager object

        :param wait_interval: time between each new process spawn
        :type wait_interval: float

        """

        if self._process_runner_already_created:
            raise RuntimeError("Only one instance of ProcessRunner per "
                               "application is allowed")
        ServiceManager._process_runner_already_created = True

        self._wait_interval = wait_interval
        self._shutdown = threading.Event()

        self._running_services = collections.defaultdict(dict)
        self._services = []
        self._forktimes = []
        self._current_process = None

        # Try to create a session id if possible
        try:
            os.setsid()
        except OSError:
            pass

        self.readpipe, self.writepipe = os.pipe()

        signal.signal(signal.SIGTERM, self._clean_exit)
        signal.signal(signal.SIGINT, self._fast_exit)
        signal.signal(signal.SIGALRM, self._alarm_exit)
        signal.signal(signal.SIGHUP, self._reload_services)
__init__.py 文件源码 项目:deb-python-cotyledon 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _fast_exit(self, signo, frame,
                   reason='Caught SIGINT signal, instantaneous exiting'):
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        signal.signal(signal.SIGALRM, signal.SIG_IGN)
        LOG.info(reason)
        os.killpg(0, signal.SIGINT)
        os._exit(1)
collectors.py 文件源码 项目:nstock 作者: ybenitezf 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def prepare(self, top_searcher, q, context):
        self.child.prepare(top_searcher, q, context)

        self.timedout = False
        if self.use_alarm:
            import signal
            signal.signal(signal.SIGALRM, self._was_signaled)

        # Start a timer thread. If the timer fires, it will call this object's
        # _timestop() method
        self.timer = threading.Timer(self.timelimit, self._timestop)
        self.timer.start()
collectors.py 文件源码 项目:nstock 作者: ybenitezf 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _timestop(self):
        # Called when the timer expires
        self.timer = None
        # Set an attribute that will be noticed in the collect_matches() loop
        self.timedout = True

        if self.use_alarm:
            import signal
            os.kill(os.getpid(), signal.SIGALRM)
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_signal_handling_while_selecting(self):
        # Test with a signal actually arriving during a select() call.
        caught = 0

        def my_handler():
            nonlocal caught
            caught += 1
            self.loop.stop()

        self.loop.add_signal_handler(signal.SIGALRM, my_handler)

        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
        self.loop.run_forever()
        self.assertEqual(caught, 1)
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_signal_handling_args(self):
        some_args = (42,)
        caught = 0

        def my_handler(*args):
            nonlocal caught
            caught += 1
            self.assertEqual(args, some_args)

        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)

        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
        self.loop.call_later(0.5, self.loop.stop)
        self.loop.run_forever()
        self.assertEqual(caught, 1)
support.py 文件源码 项目:selectors2 作者: SethMichaelLarson 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        time.sleep(self.timeout)
        if not self.canceled:
            os.kill(os.getpid(), signal.SIGALRM)
support.py 文件源码 项目:selectors2 作者: SethMichaelLarson 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _begin_alarm_thread(self, timeout):
        if not hasattr(signal, "SIGALRM"):
            self.skipTest("Platform doesn't have signal.SIGALRM")
        self.addCleanup(self._cancel_alarm_thread)
        self.alarm_thread = AlarmThread(timeout)
        self.alarm_thread.start()
support.py 文件源码 项目:selectors2 作者: SethMichaelLarson 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_alarm(self, duration, handler):
        sigalrm_handler = signal.signal(signal.SIGALRM, handler)
        self.addCleanup(signal.signal, signal.SIGALRM, sigalrm_handler)
        self._begin_alarm_thread(duration)
exceptions.py 文件源码 项目:AutoML-Challenge 作者: postech-mlg-exbrain 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def handler(signum, frame):
    # logs message with level debug on this logger
    if signum == signal.SIGXCPU:
        # when process reaches soft limit --> a SIGXCPU signal is sent
        # (it normally terminats the process)
        raise CpuTimeoutException
    elif signum == signal.SIGALRM:
        # SIGALRM is sent to process when the specified time limit to an alarm function elapses
        # (when real or clock time elapses)
        raise TimeoutException
    raise AnythingException
_utils.py 文件源码 项目:cotyledon 作者: sileht 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def _signal_catcher(self, sig, frame):
        # NOTE(sileht): This is useful only for python < 3.5
        # in python >= 3.5 we could read the signal number
        # from the wakeup_fd pipe
        if sig in (SIGALRM, signal.SIGTERM):
            self._signals_received.appendleft(sig)
        else:
            self._signals_received.append(sig)
test_interactiveshell.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_exit_code_signal(self):
        self.mktmp("import signal, time\n"
                   "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
                   "time.sleep(1)\n")
        self.system("%s %s" % (sys.executable, self.fname))
        self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
collectors.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def prepare(self, top_searcher, q, context):
        self.child.prepare(top_searcher, q, context)

        self.timedout = False
        if self.use_alarm:
            import signal
            signal.signal(signal.SIGALRM, self._was_signaled)

        # Start a timer thread. If the timer fires, it will call this object's
        # _timestop() method
        self.timer = threading.Timer(self.timelimit, self._timestop)
        self.timer.start()
collectors.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _timestop(self):
        # Called when the timer expires
        self.timer = None
        # Set an attribute that will be noticed in the collect_matches() loop
        self.timedout = True

        if self.use_alarm:
            import signal
            os.kill(os.getpid(), signal.SIGALRM)
test_threadsignals.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.


问题


面经


文章

微信
公众号

扫码关注公众号