python类SIGUSR2的实例源码

test_threadsignals.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)
test_threadsignals.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.
test_threadsignals.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)
cli.py 文件源码 项目:cligraphy 作者: Netflix-Skunkworks 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _run_command_process(self, args):
        """Command (child) process entry point. args contains the function to execute and all arguments."""

        setup_logging(args._level)

        command = ' '.join(sys.argv[1:])
        setproctitle('oc/command/%s' % command)
        faulthandler.register(signal.SIGUSR2, all_threads=True, chain=False)  # pylint:disable=no-member
        self._setup_requests_audit_headers(command)

        ret = 1
        try:
            chain = [self._run]
            if args._profile:
                chain.append(profiling_wrapper)
            if args._pdb:
                chain.append(pdb_wrapper)
            ret = call_chain(chain, args)
        except SystemExit as exc:
            ret = exc.code
        except ParserError as pe:
            pe.report()
        except Exception:  # pylint:disable=broad-except
            logging.exception('Top level exception in command process')
        finally:
            sys.exit(ret)
test_threadsignals.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.
test_threadsignals.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)
test_threadsignals.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.
test_threadsignals.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_signals(self):
        # Test signal handling semantics of threads.
        # We spawn a thread, have the thread send two signals, and
        # wait for it to finish. Check that we got both signals
        # and that they were run by the main thread.
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_threadsignals.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)
test_threadsignals.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def registerSignals(for_usr1, for_usr2, for_alrm):
    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
    alrm = signal.signal(signal.SIGALRM, for_alrm)
    return usr1, usr2, alrm


# The signal handler. Just note that the signal occurred and
# from who.
test_threadsignals.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_signals():
    os.kill(process_pid, signal.SIGUSR1)
    os.kill(process_pid, signal.SIGUSR2)
    signalled_all.release()
test_threadsignals.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_main():
    global signal_blackboard

    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }

    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    try:
        run_unittest(ThreadSignals)
    finally:
        registerSignals(*oldsigs)


问题


面经


文章

微信
公众号

扫码关注公众号