python类set_wakeup_fd()的实例源码

_utils.py 文件源码 项目:cotyledon 作者: sileht 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, master=False):
        # Setup signal fd, this allows signal to behave correctly
        if os.name == 'posix':
            self.signal_pipe_r, self.signal_pipe_w = os.pipe()
            self._set_nonblock(self.signal_pipe_r)
            self._set_nonblock(self.signal_pipe_w)
            signal.set_wakeup_fd(self.signal_pipe_w)

        self._signals_received = collections.deque()

        signal.signal(signal.SIGINT, signal.SIG_DFL)
        if os.name == 'posix':
            signal.signal(signal.SIGCHLD, signal.SIG_DFL)
            signal.signal(signal.SIGTERM, self._signal_catcher)
            signal.signal(signal.SIGALRM, self._signal_catcher)
            signal.signal(signal.SIGHUP, self._signal_catcher)
        else:
            # currently a noop on window...
            signal.signal(signal.SIGTERM, self._signal_catcher)
            # FIXME(sileht): should allow to catch signal CTRL_BREAK_EVENT,
            # but we to create the child process with CREATE_NEW_PROCESS_GROUP
            # to make this work, so current this is a noop for later fix
            signal.signal(signal.SIGBREAK, self._signal_catcher)
base.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:tabmaster 作者: NicolasMinghetti 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:infiblog 作者: RajuKoushik 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:metrics 作者: Jeremy-Friedman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
_signals.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def installHandler(fd):
    """
    Install a signal handler which will write a byte to C{fd} when
    I{SIGCHLD} is received.

    This is implemented by installing a SIGCHLD handler that does nothing,
    setting the I{SIGCHLD} handler as not allowed to interrupt system calls,
    and using L{signal.set_wakeup_fd} to do the actual writing.

    @param fd: The file descriptor to which to write when I{SIGCHLD} is
        received.
    @type fd: C{int}
    """
    if fd == -1:
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
    else:
        def noopSignalHandler(*args):
            pass
        signal.signal(signal.SIGCHLD, noopSignalHandler)
        signal.siginterrupt(signal.SIGCHLD, False)
    return signal.set_wakeup_fd(fd)
base.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
base.py 文件源码 项目:Alfred 作者: jkachhadia 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def init_signals(self):
        # reset signaling
        [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
        # init new signaling
        signal.signal(signal.SIGQUIT, self.handle_quit)
        signal.signal(signal.SIGTERM, self.handle_exit)
        signal.signal(signal.SIGINT, self.handle_quit)
        signal.signal(signal.SIGWINCH, self.handle_winch)
        signal.signal(signal.SIGUSR1, self.handle_usr1)
        signal.signal(signal.SIGABRT, self.handle_abort)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        if hasattr(signal, 'siginterrupt'):  # python >= 2.6
            signal.siginterrupt(signal.SIGTERM, False)
            signal.siginterrupt(signal.SIGUSR1, False)

        if hasattr(signal, 'set_wakeup_fd'):
            signal.set_wakeup_fd(self.PIPE[1])
EventQueueEmptyEventHandler.py 文件源码 项目:py-enarksh 作者: SetBased 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def init(cls):
        """
        Creates a pipe for waking up a select call when a signal has been received.
        """
        cls.__wake_up_pipe = os.pipe()
        fcntl.fcntl(cls.__wake_up_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)

        signal.set_wakeup_fd(EventQueueEmptyEventHandler.__wake_up_pipe[1])

    # ------------------------------------------------------------------------------------------------------------------
unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def remove_signal_handler(self, sig):
        """Remove a handler for a signal.  UNIX only.

        Return True if a signal handler was removed, False if not.
        """
        self._check_signal(sig)
        try:
            del self._signal_handlers[sig]
        except KeyError:
            return False

        if sig == signal.SIGINT:
            handler = signal.default_int_handler
        else:
            handler = signal.SIG_DFL

        try:
            signal.signal(sig, handler)
        except OSError as exc:
            if exc.errno == errno.EINVAL:
                raise RuntimeError('sig {} cannot be caught'.format(sig))
            else:
                raise

        if not self._signal_handlers:
            try:
                signal.set_wakeup_fd(-1)
            except (ValueError, OSError) as exc:
                logger.info('set_wakeup_fd(-1) failed: %s', exc)

        return True
_io_windows.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self):
        # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx
        self._closed = True
        self._iocp = _check(
            kernel32.
            CreateIoCompletionPort(INVALID_HANDLE_VALUE, ffi.NULL, 0, 0)
        )
        self._closed = False
        self._iocp_queue = deque()
        self._iocp_thread = None
        self._overlapped_waiters = {}
        self._completion_key_queues = {}
        # Completion key 0 is reserved for regular IO events
        self._completion_key_counter = itertools.count(1)

        # {stdlib socket object: task}
        # except that wakeup socket is mapped to None
        self._socket_waiters = {"read": {}, "write": {}}
        self._main_thread_waker = WakeupSocketpair()
        wakeup_sock = self._main_thread_waker.wakeup_sock
        self._socket_waiters["read"][wakeup_sock] = None

        # This is necessary to allow control-C to interrupt select().
        # https://github.com/python-trio/trio/issues/42
        if threading.current_thread() == threading.main_thread():
            fileno = self._main_thread_waker.write_sock.fileno()
            self._old_signal_wakeup_fd = signal.set_wakeup_fd(fileno)
_io_windows.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def close(self):
        if not self._closed:
            self._closed = True
            _check(kernel32.CloseHandle(self._iocp))
            if self._iocp_thread is not None:
                self._iocp_thread.join()
            self._main_thread_waker.close()
            if threading.current_thread() == threading.main_thread():
                signal.set_wakeup_fd(self._old_signal_wakeup_fd)
_utils.py 文件源码 项目:cotyledon 作者: sileht 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _wait_forever(self):
        # Wait forever
        while True:
            # Check if signals have been received
            if os.name == "posix":
                self._empty_signal_pipe()
            self._run_signal_handlers()

            if os.name == "posix":
                # NOTE(sileht): we cannot use threading.Event().wait(),
                # threading.Thread().join(), or time.sleep() because signals
                # can be missed when received by non-main threads
                # (https://bugs.python.org/issue5315)
                # So we use select.select() alone, we will receive EINTR or
                # will read data from signal_r when signal is emitted and
                # cpython calls PyErr_CheckSignals() to run signals handlers
                # That looks perfect to ensure handlers are run and run in the
                # main thread
                try:
                    select.select([self.signal_pipe_r], [], [])
                except select.error as e:
                    if e.args[0] != errno.EINTR:
                        raise
            else:
                # NOTE(sileht): here we do only best effort
                # and wake the loop periodically, set_wakeup_fd
                # doesn't work on non posix platform so
                # 1 seconds have been picked with the advice of a dice.
                time.sleep(1)
                # NOTE(sileht): We emulate SIGCHLD, _service_manager
                # will just check often for dead child
                self._signals_received.append(SIGCHLD)
test_signal.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def check_wakeup(self, test_body):
        # use a subprocess to have only one thread and to not change signal
        # handling of the parent process
        code = """if 1:
        import fcntl
        import os
        import signal

        def handler(signum, frame):
            pass

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        flags = fcntl.fcntl(write, fcntl.F_GETFL, 0)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(write, fcntl.F_SETFL, flags)
        signal.set_wakeup_fd(write)

        test()

        os.close(read)
        os.close(write)
        """.format(test_body)

        assert_python_ok('-c', code)
test_signal.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_invalid_fd(self):
        fd = test_support.make_bad_fd()
        self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
test_signal.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        import fcntl

        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
        self.read, self.write = os.pipe()
        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
        self.old_wakeup = signal.set_wakeup_fd(self.write)
test_signal.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def tearDown(self):
        signal.set_wakeup_fd(self.old_wakeup)
        os.close(self.read)
        os.close(self.write)
        signal.signal(signal.SIGALRM, self.alrm)
test_signal.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_invalid_fd(self):
        fd = test_support.make_bad_fd()
        self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
test_signal.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setUp(self):
        import fcntl

        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
        self.read, self.write = os.pipe()
        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
        self.old_wakeup = signal.set_wakeup_fd(self.write)
test_signal.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def tearDown(self):
        signal.set_wakeup_fd(self.old_wakeup)
        os.close(self.read)
        os.close(self.write)
        signal.signal(signal.SIGALRM, self.alrm)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_invalid_fd(self):
        fd = support.make_bad_fd()
        self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def check_wakeup(self, test_body, *signals, ordered=True):
        # use a subprocess to have only one thread
        code = """if 1:
        import fcntl
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        for fd in (read, write):
            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
            flags = flags | os.O_NONBLOCK
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        """.format(signals, ordered, test_body)

        assert_python_ok('-c', code)
manager.py 文件源码 项目:pssh 作者: lilydjwg 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle_sigchld(self, number, frame):
        """Apparently we need a sigchld handler to make set_wakeup_fd work."""
        for task in self.running:
            if task.proc:
                task.proc.poll()
        # Apparently some UNIX systems automatically reset the SIGCHLD
        # handler to SIG_DFL.  Reset it just in case.
        self.set_sigchld_handler()
manager.py 文件源码 项目:pssh 作者: lilydjwg 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        self.readmap = {}
        self.writemap = {}

        # Setup the wakeup file descriptor to avoid hanging on lost signals.
        wakeup_readfd, wakeup_writefd = os.pipe()
        fcntl.fcntl(wakeup_writefd, fcntl.F_SETFL, os.O_NONBLOCK)
        self.register_read(wakeup_readfd, self.wakeup_handler)
        signal.set_wakeup_fd(wakeup_writefd)
unix_events.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def remove_signal_handler(self, sig):
        """Remove a handler for a signal.  UNIX only.

        Return True if a signal handler was removed, False if not.
        """
        self._check_signal(sig)
        try:
            del self._signal_handlers[sig]
        except KeyError:
            return False

        if sig == signal.SIGINT:
            handler = signal.default_int_handler
        else:
            handler = signal.SIG_DFL

        try:
            signal.signal(sig, handler)
        except OSError as exc:
            if exc.errno == errno.EINVAL:
                raise RuntimeError('sig {} cannot be caught'.format(sig))
            else:
                raise

        if not self._signal_handlers:
            try:
                signal.set_wakeup_fd(-1)
            except (ValueError, OSError) as exc:
                logger.info('set_wakeup_fd(-1) failed: %s', exc)

        return True


问题


面经


文章

微信
公众号

扫码关注公众号