python类SIGUSR2的实例源码

log_test.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_log(self):
        """Logging tests"""

        # Not much to test here but exercise the code nonetheless
        # for regression/coverage.

        log = Log(verbose=False, prefix='test')

        log.debug("Invisible debug.")

        try:
            raise ValueError("Test exception")
        except ValueError:
            log.exception("Test exception with message")

        for num in range(1, 5):
            log.debug("Debug")
            log.info("Info")
            log.warning("Warning")
            log.error("Error")
            log.critical("Crtitical")
            os.kill(os.getpid(),
                    signal.SIGUSR1 if (num % 2) != 0 else signal.SIGUSR2)
redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def handle_logging_signal(self, signum, frame):
        """
        Handle a signal sent to this process (SIGUSR1 or SIGUSR2) to enable or
        disable logging.

        :param signum: signal number sent to process
        :type signum: int
        :param frame: current stack frame when signal was caught
        """
        if signum == signal.SIGUSR1:
            logger.warning('Logging enabled via signal; send SIGUSR2 to PID '
                           '%d to disable logging', getpid())
            self.log_enabled = True
        elif signum == signal.SIGUSR2:
            logger.warning('Logging disabled via signal; send SIGUSR1 to PID '
                           '%d to enable logging', getpid())
            self.log_enabled = False
        # else don't know how we got here, but ignore it
test_threadsignals.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
app.py 文件源码 项目:apart-gtk 作者: alexheretic 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    win = Window()
    # allow keyboard interrupt / nodemon to end program cleanly
    for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGUSR2]:
        signal.signal(sig, lambda _s, _f: win.on_delete())

    style_provider = Gtk.CssProvider()
    style_provider.load_from_path(os.path.dirname(os.path.realpath(__file__)) + "/apart.css")
    Gtk.StyleContext.add_provider_for_screen(
        Gdk.Screen.get_default(),
        style_provider,
        Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
    )

    win.show_all()
    Gtk.main()
pin.py 文件源码 项目:choronzon 作者: CENSUS 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def handler(self):
        '''
            the signal/event alarm handler. This code is executed
            when the alarm is off.
        '''
        if self.process.poll() != None:
            return
        try:
            if platform.system() == 'Linux':
                self.process.send_signal(signal.SIGUSR2)
            elif platform.system() == 'Windows':
                event_object = ctypes.windll.kernel32.OpenEventA(
                                    EVENT_ALL_ACCESS, False, self.event_name)

                ctypes.windll.kernel32.SetEvent(event_object)
        except OSError, ex:
            print '[!] ERROR: ', ex
test_threadsignals.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
proc.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def setup_limit(self):
        """set up the process limit"""
        assert currentThread().getName() == 'MainThread'
        os.setpgrp()
        if self._limit_set <= 0:
            if self.max_time is not None:
                self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
                self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
                                    self._time_out)
                self._start_time = int(time())
                self._timer.start()
            if self.max_cpu_time is not None:
                self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
                cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
                self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
                setrlimit(RLIMIT_CPU, cpu_limit)
            if self.max_memory is not None:
                self._msentinel = MemorySentinel(1, int(self.max_memory) )
                self._old_max_memory = getrlimit(RLIMIT_AS)
                self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
                as_limit = (int(self.max_memory), self._old_max_memory[1])
                setrlimit(RLIMIT_AS, as_limit)
                self._msentinel.start()
        self._limit_set += 1
proc.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def clean_limit(self):
        """reinstall the old process limit"""
        if self._limit_set > 0:
            if self.max_time is not None:
                self._timer.cancel()
                self._elapse_time += int(time())-self._start_time
                self._timer = None
                signal(SIGUSR2, self._old_usr2_hdlr)
            if self.max_cpu_time is not None:
                setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
                signal(SIGXCPU, self._old_sigxcpu_hdlr)
            if self.max_memory is not None:
                self._msentinel.stop()
                self._msentinel = None
                setrlimit(RLIMIT_AS, self._old_max_memory)
                signal(SIGUSR1, self._old_usr1_hdlr)
        self._limit_set -= 1
proc.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def setup_limit(self):
        """set up the process limit"""
        assert currentThread().getName() == 'MainThread'
        os.setpgrp()
        if self._limit_set <= 0:
            if self.max_time is not None:
                self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
                self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
                                    self._time_out)
                self._start_time = int(time())
                self._timer.start()
            if self.max_cpu_time is not None:
                self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
                cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
                self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
                setrlimit(RLIMIT_CPU, cpu_limit)
            if self.max_memory is not None:
                self._msentinel = MemorySentinel(1, int(self.max_memory) )
                self._old_max_memory = getrlimit(RLIMIT_AS)
                self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
                as_limit = (int(self.max_memory), self._old_max_memory[1])
                setrlimit(RLIMIT_AS, as_limit)
                self._msentinel.start()
        self._limit_set += 1
proc.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def clean_limit(self):
        """reinstall the old process limit"""
        if self._limit_set > 0:
            if self.max_time is not None:
                self._timer.cancel()
                self._elapse_time += int(time())-self._start_time
                self._timer = None
                signal(SIGUSR2, self._old_usr2_hdlr)
            if self.max_cpu_time is not None:
                setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
                signal(SIGXCPU, self._old_sigxcpu_hdlr)
            if self.max_memory is not None:
                self._msentinel.stop()
                self._msentinel = None
                setrlimit(RLIMIT_AS, self._old_max_memory)
                signal(SIGUSR1, self._old_usr1_hdlr)
        self._limit_set -= 1
proc.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setup_limit(self):
        """set up the process limit"""
        assert currentThread().getName() == 'MainThread'
        os.setpgrp()
        if self._limit_set <= 0:
            if self.max_time is not None:
                self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
                self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
                                    self._time_out)
                self._start_time = int(time())
                self._timer.start()
            if self.max_cpu_time is not None:
                self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
                cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
                self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
                setrlimit(RLIMIT_CPU, cpu_limit)
            if self.max_memory is not None:
                self._msentinel = MemorySentinel(1, int(self.max_memory) )
                self._old_max_memory = getrlimit(RLIMIT_AS)
                self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
                as_limit = (int(self.max_memory), self._old_max_memory[1])
                setrlimit(RLIMIT_AS, as_limit)
                self._msentinel.start()
        self._limit_set += 1
proc.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def clean_limit(self):
        """reinstall the old process limit"""
        if self._limit_set > 0:
            if self.max_time is not None:
                self._timer.cancel()
                self._elapse_time += int(time())-self._start_time
                self._timer = None
                signal(SIGUSR2, self._old_usr2_hdlr)
            if self.max_cpu_time is not None:
                setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
                signal(SIGXCPU, self._old_sigxcpu_hdlr)
            if self.max_memory is not None:
                self._msentinel.stop()
                self._msentinel = None
                setrlimit(RLIMIT_AS, self._old_max_memory)
                signal(SIGUSR1, self._old_usr1_hdlr)
        self._limit_set -= 1
proc.py 文件源码 项目:wuye.vim 作者: zhaoyingnan911 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def setup_limit(self):
        """set up the process limit"""
        assert currentThread().getName() == 'MainThread'
        os.setpgrp()
        if self._limit_set <= 0:
            if self.max_time is not None:
                self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout)
                self._timer = Timer(max(1, int(self.max_time) - self._elapse_time),
                                    self._time_out)
                self._start_time = int(time())
                self._timer.start()
            if self.max_cpu_time is not None:
                self._old_max_cpu_time = getrlimit(RLIMIT_CPU)
                cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1])
                self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu)
                setrlimit(RLIMIT_CPU, cpu_limit)
            if self.max_memory is not None:
                self._msentinel = MemorySentinel(1, int(self.max_memory) )
                self._old_max_memory = getrlimit(RLIMIT_AS)
                self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory)
                as_limit = (int(self.max_memory), self._old_max_memory[1])
                setrlimit(RLIMIT_AS, as_limit)
                self._msentinel.start()
        self._limit_set += 1
proc.py 文件源码 项目:wuye.vim 作者: zhaoyingnan911 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def clean_limit(self):
        """reinstall the old process limit"""
        if self._limit_set > 0:
            if self.max_time is not None:
                self._timer.cancel()
                self._elapse_time += int(time())-self._start_time
                self._timer = None
                signal(SIGUSR2, self._old_usr2_hdlr)
            if self.max_cpu_time is not None:
                setrlimit(RLIMIT_CPU, self._old_max_cpu_time)
                signal(SIGXCPU, self._old_sigxcpu_hdlr)
            if self.max_memory is not None:
                self._msentinel.stop()
                self._msentinel = None
                setrlimit(RLIMIT_AS, self._old_max_memory)
                signal(SIGUSR1, self._old_usr1_hdlr)
        self._limit_set -= 1
parse_replication_stream_internal_test.py 文件源码 项目:mysql_streamer 作者: Yelp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_register_signal_handler(
        self,
        patch_config,
        patch_db_connections,
        patch_restarter,
        patch_signal,
        patch_running,
        patch_producer,
        patch_exit,
    ):
        patch_running.return_value = False
        replication_stream = self._init_and_run_batch()
        # ZKLock also calls patch_signal, so we have to work around it
        assert [
            mock.call(signal.SIGINT, replication_stream._handle_shutdown_signal),
            mock.call(signal.SIGTERM, replication_stream._handle_shutdown_signal),
            mock.call(signal.SIGUSR2, replication_stream._handle_profiler_signal),
        ] in patch_signal.call_args_list
agent.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_in_main_thread(self, func):
        if self.main_thread_func:
            return False

        self.main_thread_func = func
        os.kill(os.getpid(), signal.SIGUSR2)

        return True
startup.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _listen_configs(self, error_event):
        closing = False
        term_count = 0
        configs = frozenset()

        try:
            while True:
                try:
                    while not closing:
                        if frozenset(self._handlers) == configs:
                            configs = yield idiokit.next()
                            configs = frozenset(iter_startups(config.flatten(configs)))
                        yield self._apply(configs, error_event)
                    yield self._wait(self._handlers.values())
                except idiokit.Signal as sig:
                    closing = True

                    if sig.signum == signal.SIGUSR1:
                        self._clean(signal.SIGTERM)
                        continue

                    if sig.signum == signal.SIGUSR2:
                        self._clean(signal.SIGKILL)
                        continue

                    if term_count == 0:
                        self._clean(signal.SIGTERM)
                        term_count += 1
                        continue
                break
        finally:
            self._check()
commands.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_for_instance(self, options, instance):
        if not options.kill:
            yield instance.stop(signal.SIGUSR1)
        else:
            yield instance.stop(signal.SIGUSR2)
process.py 文件源码 项目:python-application 作者: AGProjects 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _setup_signal_handlers(self):
        """Setup the signal handlers for daemon mode"""
        signals = self.signals
        # Ignore Terminal I/O Signals
        if hasattr(signal, 'SIGTTOU'):
            signals.ignore(signal.SIGTTOU)
        if hasattr(signal, 'SIGTTIN'):
            signals.ignore(signal.SIGTTIN)
        if hasattr(signal, 'SIGTSTP'):
            signals.ignore(signal.SIGTSTP)
        # Ignore USR signals
        if hasattr(signal, 'SIGUSR1'):
            signals.ignore(signal.SIGUSR1)
        if hasattr(signal, 'SIGUSR2'):
            signals.ignore(signal.SIGUSR2)
redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def setup_signal_handlers(self):
        """
        setup signal handlers for logging enable/disable

        Note that this doesn't work on Windows.
        """
        signal.signal(signal.SIGUSR1, self.handle_logging_signal)
        signal.signal(signal.SIGUSR2, self.handle_logging_signal)
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_setup_signal_handlers(self):
        with patch('%s.signal.signal' % pbm) as mock_signal:
            self.cls.setup_signal_handlers()
        assert mock_signal.mock_calls == [
            call(signal.SIGUSR1, self.cls.handle_logging_signal),
            call(signal.SIGUSR2, self.cls.handle_logging_signal),
        ]
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_handle_logging_signal_USR1(self):
        self.cls.log_enabled = False
        with patch('%s.logger' % pbm) as mock_logger:
            with patch('%s.getpid' % pbm) as mock_getpid:
                mock_getpid.return_value = 12345
                self.cls.handle_logging_signal(signal.SIGUSR1, None)
        assert mock_logger.mock_calls == [
            call.warning('Logging enabled via signal; send SIGUSR2 to PID %d '
                         'to disable logging', 12345)
        ]
        assert self.cls.log_enabled is True
test_redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_handle_logging_signal_USR2(self):
        self.cls.log_enabled = True
        with patch('%s.logger' % pbm) as mock_logger:
            with patch('%s.getpid' % pbm) as mock_getpid:
                mock_getpid.return_value = 12345
                self.cls.handle_logging_signal(signal.SIGUSR2, None)
        assert mock_logger.mock_calls == [
            call.warning('Logging disabled via signal; send SIGUSR1 to PID %d '
                         'to enable logging', 12345)
        ]
        assert self.cls.log_enabled is False
test_threadsignals.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.
test_threadsignals.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)
test_threadsignals.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.


问题


面经


文章

微信
公众号

扫码关注公众号