python类SIGALRM的实例源码

collectors.py 文件源码 项目:nstock 作者: ybenitezf 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, child, timelimit, greedy=False, use_alarm=True):
        """
        :param child: the collector to wrap.
        :param timelimit: the maximum amount of time (in seconds) to
            allow for searching. If the search takes longer than this, it will
            raise a ``TimeLimit`` exception.
        :param greedy: if ``True``, the collector will finish adding the most
            recent hit before raising the ``TimeLimit`` exception.
        :param use_alarm: if ``True`` (the default), the collector will try to
            use signal.SIGALRM (on UNIX).
        """
        self.child = child
        self.timelimit = timelimit
        self.greedy = greedy

        if use_alarm:
            import signal
            self.use_alarm = use_alarm and hasattr(signal, "SIGALRM")
        else:
            self.use_alarm = False

        self.timer = None
        self.timedout = False
test_selectors.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_select_interrupt(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
        self.addCleanup(signal.alarm, 0)

        signal.alarm(1)

        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(2))
        self.assertLess(time() - t, 2.5)
_utils.py 文件源码 项目:cotyledon 作者: sileht 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, master=False):
        # Setup signal fd, this allows signal to behave correctly
        if os.name == 'posix':
            self.signal_pipe_r, self.signal_pipe_w = os.pipe()
            self._set_nonblock(self.signal_pipe_r)
            self._set_nonblock(self.signal_pipe_w)
            signal.set_wakeup_fd(self.signal_pipe_w)

        self._signals_received = collections.deque()

        signal.signal(signal.SIGINT, signal.SIG_DFL)
        if os.name == 'posix':
            signal.signal(signal.SIGCHLD, signal.SIG_DFL)
            signal.signal(signal.SIGTERM, self._signal_catcher)
            signal.signal(signal.SIGALRM, self._signal_catcher)
            signal.signal(signal.SIGHUP, self._signal_catcher)
        else:
            # currently a noop on window...
            signal.signal(signal.SIGTERM, self._signal_catcher)
            # FIXME(sileht): should allow to catch signal CTRL_BREAK_EVENT,
            # but we to create the child process with CREATE_NEW_PROCESS_GROUP
            # to make this work, so current this is a noop for later fix
            signal.signal(signal.SIGBREAK, self._signal_catcher)
collectors.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, child, timelimit, greedy=False, use_alarm=True):
        """
        :param child: the collector to wrap.
        :param timelimit: the maximum amount of time (in seconds) to
            allow for searching. If the search takes longer than this, it will
            raise a ``TimeLimit`` exception.
        :param greedy: if ``True``, the collector will finish adding the most
            recent hit before raising the ``TimeLimit`` exception.
        :param use_alarm: if ``True`` (the default), the collector will try to
            use signal.SIGALRM (on UNIX).
        """
        self.child = child
        self.timelimit = timelimit
        self.greedy = greedy

        if use_alarm:
            import signal
            self.use_alarm = use_alarm and hasattr(signal, "SIGALRM")
        else:
            self.use_alarm = False

        self.timer = None
        self.timedout = False
test_subprocess.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_communicate_eintr(self):
        # Issue #12493: communicate() should handle EINTR
        def handler(signum, frame):
            pass
        old_handler = signal.signal(signal.SIGALRM, handler)
        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)

        # the process is running for 2 seconds
        args = [sys.executable, "-c", 'import time; time.sleep(2)']
        for stream in ('stdout', 'stderr'):
            kw = {stream: subprocess.PIPE}
            with subprocess.Popen(args, **kw) as process:
                signal.alarm(1)
                # communicate() will be interrupted by SIGALRM
                process.communicate()


# context manager
collectors.py 文件源码 项目:WhooshSearch 作者: rokartnaz 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, child, timelimit, greedy=False, use_alarm=True):
        """
        :param child: the collector to wrap.
        :param timelimit: the maximum amount of time (in seconds) to
            allow for searching. If the search takes longer than this, it will
            raise a ``TimeLimit`` exception.
        :param greedy: if ``True``, the collector will finish adding the most
            recent hit before raising the ``TimeLimit`` exception.
        :param use_alarm: if ``True`` (the default), the collector will try to
            use signal.SIGALRM (on UNIX).
        """
        self.child = child
        self.timelimit = timelimit
        self.greedy = greedy

        if use_alarm:
            import signal
            self.use_alarm = use_alarm and hasattr(signal, "SIGALRM")
        else:
            self.use_alarm = False

        self.timer = None
        self.timedout = False
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_lock_acquire_interruption(self):
        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
        # in a deadlock.
        # XXX this test can fail when the legacy (non-semaphore) implementation
        # of locks is used in thread_pthread.h, see issue #11223.
        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
        try:
            lock = thread.allocate_lock()
            lock.acquire()
            signal.alarm(1)
            t1 = time.time()
            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
            dt = time.time() - t1
            # Checking that KeyboardInterrupt was raised is not sufficient.
            # We want to assert that lock.acquire() was interrupted because
            # of the signal, not that the signal handler was called immediately
            # after timeout return of lock.acquire() (which can fool assertRaises).
            self.assertLess(dt, 3.0)
        finally:
            signal.signal(signal.SIGALRM, oldalrm)
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_rlock_acquire_interruption(self):
        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
        # in a deadlock.
        # XXX this test can fail when the legacy (non-semaphore) implementation
        # of locks is used in thread_pthread.h, see issue #11223.
        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
        try:
            rlock = thread.RLock()
            # For reentrant locks, the initial acquisition must be in another
            # thread.
            def other_thread():
                rlock.acquire()
            thread.start_new_thread(other_thread, ())
            # Wait until we can't acquire it without blocking...
            while rlock.acquire(blocking=False):
                rlock.release()
                time.sleep(0.01)
            signal.alarm(1)
            t1 = time.time()
            self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
            dt = time.time() - t1
            # See rationale above in test_lock_acquire_interruption
            self.assertLess(dt, 3.0)
        finally:
            signal.signal(signal.SIGALRM, oldalrm)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_wakeup_fd_early(self):
        self.check_wakeup("""def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            signal.alarm(1)
            before_time = time.time()
            # We attempt to get a signal during the sleep,
            # before select is called
            time.sleep(TIMEOUT_FULL)
            mid_time = time.time()
            dt = mid_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.time()
            dt = after_time - mid_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        """, signal.SIGALRM)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_wakeup_fd_during(self):
        self.check_wakeup("""def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            signal.alarm(1)
            before_time = time.time()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except select.error:
                pass
            else:
                raise Exception("select.error not raised")
            after_time = time.time()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        """, signal.SIGALRM)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_sigwaitinfo_interrupted(self):
        self.wait_helper(signal.SIGUSR1, '''
        def test(signum):
            import errno

            hndl_called = True
            def alarm_handler(signum, frame):
                hndl_called = False

            signal.signal(signal.SIGALRM, alarm_handler)
            signal.alarm(1)
            try:
                signal.sigwaitinfo([signal.SIGUSR1])
            except OSError as e:
                if e.errno == errno.EINTR:
                    if not hndl_called:
                        raise Exception("SIGALRM handler not called")
                else:
                    raise Exception("Expected EINTR to be raised by sigwaitinfo")
            else:
                raise Exception("Expected EINTR to be raised by sigwaitinfo")
        ''')
test_io.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def check_reentrant_write(self, data, **fdopen_kwargs):
        def on_alarm(*args):
            # Will be called reentrantly from the same thread
            wio.write(data)
            1/0
        signal.signal(signal.SIGALRM, on_alarm)
        r, w = os.pipe()
        wio = self.io.open(w, **fdopen_kwargs)
        try:
            signal.alarm(1)
            # Either the reentrant call to wio.write() fails with RuntimeError,
            # or the signal handler raises ZeroDivisionError.
            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
                while 1:
                    for i in range(100):
                        wio.write(data)
                        wio.flush()
                    # Make sure the buffer doesn't fill up and block further writes
                    os.read(r, len(data) * 100)
            exc = cm.exception
            if isinstance(exc, RuntimeError):
                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
        finally:
            wio.close()
            os.close(r)
test_io.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
        """Check that a buffered read, when it gets interrupted (either
        returning a partial result or EINTR), properly invokes the signal
        handler and retries if the latter returned successfully."""
        r, w = os.pipe()
        fdopen_kwargs["closefd"] = False
        def alarm_handler(sig, frame):
            os.write(w, b"bar")
        signal.signal(signal.SIGALRM, alarm_handler)
        try:
            rio = self.io.open(r, **fdopen_kwargs)
            os.write(w, b"foo")
            signal.alarm(1)
            # Expected behaviour:
            # - first raw read() returns partial b"foo"
            # - second raw read() returns EINTR
            # - third raw read() returns b"bar"
            self.assertEqual(decode(rio.read(6)), "foobar")
        finally:
            rio.close()
            os.close(w)
            os.close(r)
integration_tests.py 文件源码 项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def connect_socket(url, timeout=TIMEOUT, **kwargs):
    """set up a websocket and return the socket connection object"""

    signal.signal(
        signal.SIGALRM, 
        lambda s, f: timeout_handler(s, f, f'connecting ({timeout}s)')
    )
    signal.alarm(timeout)
    try:
        sock = create_connection(url, **kwargs)
        signal.alarm(0)
        return sock
    except Exception:
        signal.alarm(0)
        print(f'[X] Failed to connect, is runserver running on {url}?')
        raise
    except Exception:
        signal.alarm(0)
        raise
integration_tests.py 文件源码 项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def send_json(socket, data: dict, timeout=TIMEOUT):
    """
    send a json-ified dictionary, throws an exception if it takes
    more than [timeout] seconds
    """

    signal.signal(
        signal.SIGALRM, 
        lambda s, f: timeout_handler(s, f, f'sending ({timeout}s)')
    )
    signal.alarm(timeout)
    try:
        result = socket.send(json.dumps(data))
        signal.alarm(0)
        return result
    except Exception:
        signal.alarm(0)
        raise
integration_tests.py 文件源码 项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def recv_json(socket, timeout=TIMEOUT):
    """
    block until a message is received [timeout] seconds, returns None 
    if nothing is received
    """

    signal.alarm(0)
    signal.signal(
        signal.SIGALRM, 
        lambda s, f: timeout_handler(s, f, f'receiving ({timeout}s)')
    )
    signal.alarm(timeout)
    try:
        result = json.loads(socket.recv())
        signal.alarm(0)
        return result
    except TimeOutException:
        signal.alarm(0)
        return None
    except Exception:
        signal.alarm(0)
        raise
integration_tests.py 文件源码 项目:django-channels-router 作者: Monadical-SAS 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def recv_all_json(socket, timeout=TIMEOUT):
    """
    block for [timeout] seconds, and return a list of all received 
    messages in that period
    """
    results = []
    try:
        last_result = True
        while last_result:

            signal.alarm(0)
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(timeout)
            try:
                last_result = json.loads(socket.recv())
            except TimeOutException:
                last_result = None
            signal.alarm(0)

            if last_result:
                results.append(last_result)

        return results
    except TimeOutException:
        return results
test_threadsignals.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_lock_acquire_interruption(self):
        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
        # in a deadlock.
        # XXX this test can fail when the legacy (non-semaphore) implementation
        # of locks is used in thread_pthread.h, see issue #11223.
        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
        try:
            lock = thread.allocate_lock()
            lock.acquire()
            signal.alarm(1)
            t1 = time.time()
            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
            dt = time.time() - t1
            # Checking that KeyboardInterrupt was raised is not sufficient.
            # We want to assert that lock.acquire() was interrupted because
            # of the signal, not that the signal handler was called immediately
            # after timeout return of lock.acquire() (which can fool assertRaises).
            self.assertLess(dt, 3.0)
        finally:
            signal.signal(signal.SIGALRM, oldalrm)
test_threadsignals.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_rlock_acquire_interruption(self):
        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
        # in a deadlock.
        # XXX this test can fail when the legacy (non-semaphore) implementation
        # of locks is used in thread_pthread.h, see issue #11223.
        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
        try:
            rlock = thread.RLock()
            # For reentrant locks, the initial acquisition must be in another
            # thread.
            def other_thread():
                rlock.acquire()
            thread.start_new_thread(other_thread, ())
            # Wait until we can't acquire it without blocking...
            while rlock.acquire(blocking=False):
                rlock.release()
                time.sleep(0.01)
            signal.alarm(1)
            t1 = time.time()
            self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
            dt = time.time() - t1
            # See rationale above in test_lock_acquire_interruption
            self.assertLess(dt, 3.0)
        finally:
            signal.signal(signal.SIGALRM, oldalrm)
test_signal.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_wakeup_fd_early(self):
        self.check_wakeup("""def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            signal.alarm(1)
            before_time = time.time()
            # We attempt to get a signal during the sleep,
            # before select is called
            time.sleep(TIMEOUT_FULL)
            mid_time = time.time()
            dt = mid_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.time()
            dt = after_time - mid_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        """, signal.SIGALRM)
test_signal.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_wakeup_fd_during(self):
        self.check_wakeup("""def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            signal.alarm(1)
            before_time = time.time()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except OSError:
                pass
            else:
                raise Exception("OSError not raised")
            after_time = time.time()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        """, signal.SIGALRM)
test_signal.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_sigwaitinfo_interrupted(self):
        self.wait_helper(signal.SIGUSR1, '''
        def test(signum):
            import errno

            hndl_called = True
            def alarm_handler(signum, frame):
                hndl_called = False

            signal.signal(signal.SIGALRM, alarm_handler)
            signal.alarm(1)
            try:
                signal.sigwaitinfo([signal.SIGUSR1])
            except OSError as e:
                if e.errno == errno.EINTR:
                    if not hndl_called:
                        raise Exception("SIGALRM handler not called")
                else:
                    raise Exception("Expected EINTR to be raised by sigwaitinfo")
            else:
                raise Exception("Expected EINTR to be raised by sigwaitinfo")
        ''')
test_io.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def check_reentrant_write(self, data, **fdopen_kwargs):
        def on_alarm(*args):
            # Will be called reentrantly from the same thread
            wio.write(data)
            1/0
        signal.signal(signal.SIGALRM, on_alarm)
        r, w = os.pipe()
        wio = self.io.open(w, **fdopen_kwargs)
        try:
            signal.alarm(1)
            # Either the reentrant call to wio.write() fails with RuntimeError,
            # or the signal handler raises ZeroDivisionError.
            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
                while 1:
                    for i in range(100):
                        wio.write(data)
                        wio.flush()
                    # Make sure the buffer doesn't fill up and block further writes
                    os.read(r, len(data) * 100)
            exc = cm.exception
            if isinstance(exc, RuntimeError):
                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
        finally:
            wio.close()
            os.close(r)
test_io.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
        """Check that a buffered read, when it gets interrupted (either
        returning a partial result or EINTR), properly invokes the signal
        handler and retries if the latter returned successfully."""
        r, w = os.pipe()
        fdopen_kwargs["closefd"] = False
        def alarm_handler(sig, frame):
            os.write(w, b"bar")
        signal.signal(signal.SIGALRM, alarm_handler)
        try:
            rio = self.io.open(r, **fdopen_kwargs)
            os.write(w, b"foo")
            signal.alarm(1)
            # Expected behaviour:
            # - first raw read() returns partial b"foo"
            # - second raw read() returns EINTR
            # - third raw read() returns b"bar"
            self.assertEqual(decode(rio.read(6)), "foobar")
        finally:
            rio.close()
            os.close(w)
            os.close(r)
test_selectors.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_select_interrupt(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)

        rd, wr = self.make_socketpair()

        orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
        self.addCleanup(signal.alarm, 0)

        signal.alarm(1)

        s.register(rd, selectors.EVENT_READ)
        t = time()
        self.assertFalse(s.select(2))
        self.assertLess(time() - t, 2.5)
collectors.py 文件源码 项目:QualquerMerdaAPI 作者: tiagovizoto 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, child, timelimit, greedy=False, use_alarm=True):
        """
        :param child: the collector to wrap.
        :param timelimit: the maximum amount of time (in seconds) to
            allow for searching. If the search takes longer than this, it will
            raise a ``TimeLimit`` exception.
        :param greedy: if ``True``, the collector will finish adding the most
            recent hit before raising the ``TimeLimit`` exception.
        :param use_alarm: if ``True`` (the default), the collector will try to
            use signal.SIGALRM (on UNIX).
        """
        self.child = child
        self.timelimit = timelimit
        self.greedy = greedy

        if use_alarm:
            import signal
            self.use_alarm = use_alarm and hasattr(signal, "SIGALRM")
        else:
            self.use_alarm = False

        self.timer = None
        self.timedout = False
test_threadsignals.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_lock_acquire_interruption(self):
        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
        # in a deadlock.
        # XXX this test can fail when the legacy (non-semaphore) implementation
        # of locks is used in thread_pthread.h, see issue #11223.
        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
        try:
            lock = thread.allocate_lock()
            lock.acquire()
            signal.alarm(1)
            t1 = time.time()
            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
            dt = time.time() - t1
            # Checking that KeyboardInterrupt was raised is not sufficient.
            # We want to assert that lock.acquire() was interrupted because
            # of the signal, not that the signal handler was called immediately
            # after timeout return of lock.acquire() (which can fool assertRaises).
            self.assertLess(dt, 3.0)
        finally:
            signal.signal(signal.SIGALRM, oldalrm)
test_threadsignals.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_rlock_acquire_interruption(self):
        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
        # in a deadlock.
        # XXX this test can fail when the legacy (non-semaphore) implementation
        # of locks is used in thread_pthread.h, see issue #11223.
        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
        try:
            rlock = thread.RLock()
            # For reentrant locks, the initial acquisition must be in another
            # thread.
            def other_thread():
                rlock.acquire()
            thread.start_new_thread(other_thread, ())
            # Wait until we can't acquire it without blocking...
            while rlock.acquire(blocking=False):
                rlock.release()
                time.sleep(0.01)
            signal.alarm(1)
            t1 = time.time()
            self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
            dt = time.time() - t1
            # See rationale above in test_lock_acquire_interruption
            self.assertLess(dt, 3.0)
        finally:
            signal.signal(signal.SIGALRM, oldalrm)
test_signal.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_wakeup_fd_early(self):
        self.check_wakeup("""def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            signal.alarm(1)
            before_time = time.time()
            # We attempt to get a signal during the sleep,
            # before select is called
            time.sleep(TIMEOUT_FULL)
            mid_time = time.time()
            dt = mid_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.time()
            dt = after_time - mid_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        """, signal.SIGALRM)
test_signal.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_wakeup_fd_during(self):
        self.check_wakeup("""def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            signal.alarm(1)
            before_time = time.time()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except OSError:
                pass
            else:
                raise Exception("OSError not raised")
            after_time = time.time()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        """, signal.SIGALRM)


问题


面经


文章

微信
公众号

扫码关注公众号