python类SIGUSR1的实例源码

runtime_test.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def test_register_signal(self):
        if runtime_info.OS_WIN:
            return

        result = {'handler': 0}

        def _handler(signum, frame):
            result['handler'] += 1

        register_signal(signal.SIGUSR1, _handler)

        os.kill(os.getpid(), signal.SIGUSR1)
        os.kill(os.getpid(), signal.SIGUSR1)

        signal.signal(signal.SIGUSR1, signal.SIG_DFL)

        self.assertEqual(result['handler'], 2)
log_test.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_log(self):
        """Logging tests"""

        # Not much to test here but exercise the code nonetheless
        # for regression/coverage.

        log = Log(verbose=False, prefix='test')

        log.debug("Invisible debug.")

        try:
            raise ValueError("Test exception")
        except ValueError:
            log.exception("Test exception with message")

        for num in range(1, 5):
            log.debug("Debug")
            log.info("Info")
            log.warning("Warning")
            log.error("Error")
            log.critical("Crtitical")
            os.kill(os.getpid(),
                    signal.SIGUSR1 if (num % 2) != 0 else signal.SIGUSR2)
hickup.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def t01(factory):
    pretty = '%s t1' % __file__
    print(pretty)

    ctrl = factory.make_master('master')
    pid  = ctrl.get_pid()
    os.kill(pid, signal.SIGUSR1)

    try:
        pid = ctrl.get_pid()
    except ConnectionClosed, e:
        print('FAIL %s: SIGUSR1 killed the process: %s' % (pretty, e))
        return False
    except Exception, e:
        print('FAIL %s: unknown error: %s' % (pretty, e))
        return False

    return True

# check that trace files are written to <home>/.ave/hickup with predictable
# file names
hickup.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def t04(factory):
    pretty = '%s t4' % __file__
    print(pretty)

    ctrl  = factory.make_master('master')
    bpid  = ctrl.get_pid()
    spids = ctrl.get_session_pids()

    if [pid for pid in spids if pid < 2] != []:
        print('FAIL %s: impossible session PIDs: %d' % (pretty, spids))
        return False

    # signal the broker and wait for the hickup directory to appear. the signal
    # should be propagated to the equipment lister long before the directory is
    # created.
    os.kill(bpid, signal.SIGUSR1)
    path = os.path.join(factory.HOME.path, '.ave', 'hickup')
    wait_hickup_dir(path, 3)

    spids2 = ctrl.get_session_pids()
    if spids2 != spids:
        print('FAIL %s: sessions affected: %s != %s' % (pretty, spids2, spids))
        return False

    return True
hickup.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def t01(pretty, factory):
    ctrl = factory.make_control(home=factory.HOME.path)
    pid  = ctrl.get_pid()
    os.kill(pid, signal.SIGUSR1)

    try:
        pid = ctrl.get_pid()
    except ConnectionClosed, e:
        print('FAIL %s: SIGUSR1 killed the process: %s' % (pretty, e))
        return False
    except Exception, e:
        print('FAIL %s: unknown error: %s' % (pretty, e))
        return False

    return True

# check that trace files are written to <home>/.ave/hickup with predictable
# file names
hickup.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def t03(pretty, factory):
    ctrl = factory.make_control(home=factory.HOME.path)
    pid  = ctrl.get_pid()
    ctrl.make_child()
    os.kill(pid, signal.SIGUSR1)

    path = os.path.join(factory.HOME.path, '.ave', 'hickup')
    wait_hickup_dir(path, 3)

    files = glob.glob(os.path.join(path, '*'))

    if len(files) != 2:
        print('FAIL %s: wrong number of files: %s' % (pretty, files))
        return False

    return True

# check that the signal is not propagated to non-ave processes such as external
# tools. these will otherwise terminate if they do not handle the signal.
control.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle_SIGUSR1(self, signum, frame):
        # propagate the signal to children, but only if they are AVE processes
        for pid in self.get_children():
            name = get_proc_name(pid)
            if name.startswith('ave-'):
                os.kill(pid, signal.SIGUSR1)
        # make the dump directory if it doesn't exist
        hickup_dir = os.path.join(self.home, '.ave', 'hickup')
        try:
            os.makedirs(hickup_dir)
        except OSError, e:
            if e.errno != errno.EEXIST:
                self.log('ERROR: could not create %s: %s' % (hickup_dir, e))
                return
        # create the trace file
        date = time.strftime('%Y%m%d-%H%M%S')
        name = '%s-%s-%d' % (date, self.proc_name, os.getpid())
        path = os.path.join(hickup_dir, name)
        with open(path, 'w') as f:
            f.write('stack:\n%s' % ''.join(traceback.format_stack(frame)))
            f.write('locals:\n%s\n' % frame.f_locals)
            f.write('globals:\n%s' % frame.f_globals)
hickup.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def t04(factory):
    pretty = '%s t4' % __file__
    print(pretty)

    ctrl  = factory.make_master('master')
    bpid  = ctrl.get_pid()
    spids = ctrl.get_session_pids()

    if [pid for pid in spids if pid < 2] != []:
        print('FAIL %s: impossible session PIDs: %d' % (pretty, spids))
        return False

    # signal the broker and wait for the hickup directory to appear. the signal
    # should be propagated to the equipment lister long before the directory is
    # created.
    os.kill(bpid, signal.SIGUSR1)
    path = os.path.join(factory.HOME.path, '.ave', 'hickup')
    wait_hickup_dir(path, 3)

    spids2 = ctrl.get_session_pids()
    if spids2 != spids:
        print('FAIL %s: sessions affected: %s != %s' % (pretty, spids2, spids))
        return False

    return True
hickup.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def t03(pretty, factory):
    ctrl = factory.make_control(home=factory.HOME.path)
    pid  = ctrl.get_pid()
    ctrl.make_child()
    os.kill(pid, signal.SIGUSR1)

    path = os.path.join(factory.HOME.path, '.ave', 'hickup')
    wait_hickup_dir(path, 3)

    files = glob.glob(os.path.join(path, '*'))

    if len(files) != 2:
        print('FAIL %s: wrong number of files: %s' % (pretty, files))
        return False

    return True

# check that the signal is not propagated to non-ave processes such as external
# tools. these will otherwise terminate if they do not handle the signal.
hot-restarter.py 文件源码 项目:ambassador 作者: datawire 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main():
  """ Script main. This script is designed so that a process watcher like runit or monit can watch
      this process and take corrective action if it ever goes away. """

  print ("starting hot-restarter with target: {}".format(sys.argv[1]))

  signal.signal(signal.SIGTERM, sigterm_handler)
  signal.signal(signal.SIGHUP, sighup_handler)
  signal.signal(signal.SIGCHLD, sigchld_handler)
  signal.signal(signal.SIGUSR1, sigusr1_handler)

  # Start the first child process and then go into an endless loop since everything else happens via
  # signals.
  fork_and_exec()
  while True:
    time.sleep(60)
redirector.py 文件源码 项目:vault-redirector-twisted 作者: manheim 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle_logging_signal(self, signum, frame):
        """
        Handle a signal sent to this process (SIGUSR1 or SIGUSR2) to enable or
        disable logging.

        :param signum: signal number sent to process
        :type signum: int
        :param frame: current stack frame when signal was caught
        """
        if signum == signal.SIGUSR1:
            logger.warning('Logging enabled via signal; send SIGUSR2 to PID '
                           '%d to disable logging', getpid())
            self.log_enabled = True
        elif signum == signal.SIGUSR2:
            logger.warning('Logging disabled via signal; send SIGUSR1 to PID '
                           '%d to enable logging', getpid())
            self.log_enabled = False
        # else don't know how we got here, but ignore it
test_threadsignals.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_poll_eintr(self):
        got_signal = [False]
        def record(*args):
            got_signal[0] = True
        pid = os.getpid()
        oldhandler = signal.signal(signal.SIGUSR1, record)
        try:
            killer = self.Process(target=self._killer, args=(pid,))
            killer.start()
            p = self.Process(target=time.sleep, args=(1,))
            p.start()
            p.join()
            self.assertTrue(got_signal[0])
            self.assertEqual(p.exitcode, 0)
            killer.join()
        finally:
            signal.signal(signal.SIGUSR1, oldhandler)

#
# Test to verify handle verification, see issue 3321
#
test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ignore(self):
        conn, child_conn = multiprocessing.Pipe()
        try:
            p = multiprocessing.Process(target=self._test_ignore,
                                        args=(child_conn,))
            p.daemon = True
            p.start()
            child_conn.close()
            self.assertEqual(conn.recv(), 'ready')
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            time.sleep(0.1)
            conn.send(1234)
            self.assertEqual(conn.recv(), 1234)
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
            time.sleep(0.1)
            p.join()
        finally:
            conn.close()
test_threadsignals.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_poll_eintr(self):
        got_signal = [False]
        def record(*args):
            got_signal[0] = True
        pid = os.getpid()
        oldhandler = signal.signal(signal.SIGUSR1, record)
        try:
            killer = self.Process(target=self._killer, args=(pid,))
            killer.start()
            p = self.Process(target=time.sleep, args=(1,))
            p.start()
            p.join()
            self.assertTrue(got_signal[0])
            self.assertEqual(p.exitcode, 0)
            killer.join()
        finally:
            signal.signal(signal.SIGUSR1, oldhandler)

#
# Test to verify handle verification, see issue 3321
#
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_ignore(self):
        conn, child_conn = multiprocessing.Pipe()
        try:
            p = multiprocessing.Process(target=self._test_ignore,
                                        args=(child_conn,))
            p.daemon = True
            p.start()
            child_conn.close()
            self.assertEqual(conn.recv(), 'ready')
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            time.sleep(0.1)
            conn.send(1234)
            self.assertEqual(conn.recv(), 1234)
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
            time.sleep(0.1)
            p.join()
        finally:
            conn.close()
worker.py 文件源码 项目:sanic 作者: channelcat 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def init_signals(self):
        # Set up signals through the event loop API.

        self.loop.add_signal_handler(signal.SIGQUIT, self.handle_quit,
                                     signal.SIGQUIT, None)

        self.loop.add_signal_handler(signal.SIGTERM, self.handle_exit,
                                     signal.SIGTERM, None)

        self.loop.add_signal_handler(signal.SIGINT, self.handle_quit,
                                     signal.SIGINT, None)

        self.loop.add_signal_handler(signal.SIGWINCH, self.handle_winch,
                                     signal.SIGWINCH, None)

        self.loop.add_signal_handler(signal.SIGUSR1, self.handle_usr1,
                                     signal.SIGUSR1, None)

        self.loop.add_signal_handler(signal.SIGABRT, self.handle_abort,
                                     signal.SIGABRT, None)

        # Don't let SIGTERM and SIGUSR1 disturb active requests
        # by interrupting system calls
        signal.siginterrupt(signal.SIGTERM, False)
        signal.siginterrupt(signal.SIGUSR1, False)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_sigpending(self):
        code = """if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        """
        assert_python_ok('-c', code)
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_sigwaitinfo_interrupted(self):
        self.wait_helper(signal.SIGUSR1, '''
        def test(signum):
            import errno

            hndl_called = True
            def alarm_handler(signum, frame):
                hndl_called = False

            signal.signal(signal.SIGALRM, alarm_handler)
            signal.alarm(1)
            try:
                signal.sigwaitinfo([signal.SIGUSR1])
            except OSError as e:
                if e.errno == errno.EINTR:
                    if not hndl_called:
                        raise Exception("SIGALRM handler not called")
                else:
                    raise Exception("Expected EINTR to be raised by sigwaitinfo")
            else:
                raise Exception("Expected EINTR to be raised by sigwaitinfo")
        ''')
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pthread_kill_main_thread(self):
        # Test that a signal can be sent to the main thread with pthread_kill()
        # before any other thread has been created (see issue #12392).
        code = """if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        """

        with spawn_python('-c', code) as process:
            stdout, stderr = process.communicate()
            exitcode = process.wait()
            if exitcode != 3:
                raise Exception("Child error (exit code %s): %s" %
                                (exitcode, stdout))
test_multiprocessing.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_poll_eintr(self):
        got_signal = [False]
        def record(*args):
            got_signal[0] = True
        pid = os.getpid()
        oldhandler = signal.signal(signal.SIGUSR1, record)
        try:
            killer = self.Process(target=self._killer, args=(pid,))
            killer.start()
            p = self.Process(target=time.sleep, args=(1,))
            p.start()
            p.join()
            self.assertTrue(got_signal[0])
            self.assertEqual(p.exitcode, 0)
            killer.join()
        finally:
            signal.signal(signal.SIGUSR1, oldhandler)

#
# Test to verify handle verification, see issue 3321
#
test_multiprocessing.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_ignore(self):
        conn, child_conn = multiprocessing.Pipe()
        try:
            p = multiprocessing.Process(target=self._test_ignore,
                                        args=(child_conn,))
            p.daemon = True
            p.start()
            child_conn.close()
            self.assertEqual(conn.recv(), 'ready')
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            time.sleep(0.1)
            conn.send(1234)
            self.assertEqual(conn.recv(), 1234)
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
            time.sleep(0.1)
            p.join()
        finally:
            conn.close()
test_subprocess.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_communicate_eintr(self):
        # Issue #12493: communicate() should handle EINTR
        def handler(signum, frame):
            pass
        old_handler = signal.signal(signal.SIGUSR1, handler)
        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)

        args = [sys.executable, "-c",
                'import os, signal;'
                'os.kill(os.getppid(), signal.SIGUSR1)']
        for stream in ('stdout', 'stderr'):
            kw = {stream: subprocess.PIPE}
            with subprocess.Popen(args, **kw) as process:
                # communicate() will be interrupted by SIGUSR1
                process.communicate()


    # This test is Linux-ish specific for simplicity to at least have
    # some coverage.  It is not a platform specific bug.
server.py 文件源码 项目:borgcube 作者: enkore 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def launch(self):
        self.check()

        received = False

        def set_received(*_):
            nonlocal received
            received = True

        signal.signal(signal.SIGUSR1, set_received)
        pid = self.fork_zeo()
        if not received:
            while not signal.sigtimedwait([signal.SIGUSR1], 1):
                pid, waitres = os.waitpid(pid, os.WNOHANG)
                if pid:
                    log.error('Database server failed to start (check log above).')
                    sys.exit(1)

        settings.DB_URI = urlunsplit(('zeo', '', self.zeo_path, '', ''))
        log.debug('Launched built-in ZEO')
        return pid
viento_setup.py 文件源码 项目:viento 作者: tgsachse 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def command_write():
    """
    Writes changes to file, and resets the change_count.
    """
    viento_utils.directories_check()
    with open(viento_utils.f_drafts, 'w') as f:
        json.dump(drafts, f)

    print(str(change_count.count) + " change", end='')
    if change_count.count != 1:
        print("s", end='')
    print(" saved to file.")
    viento_utils.log('file_write', args=[change_count.count,
                                         viento_utils.f_drafts])
    change_count.reset()
    try:
        with open(viento_utils.f_pid, 'r') as f:
            pid = int(f.read())
        os.kill(pid, signal.SIGUSR1)
    except FileNotFoundError:
        pass
test_threadsignals.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_signals(self):
        signalled_all.acquire()
        self.spawnSignallingThread()
        signalled_all.acquire()
        # the signals that we asked the kernel to send
        # will come back, but we don't know when.
        # (it might even be after the thread exits
        # and might be out of order.)  If we haven't seen
        # the signals yet, send yet another signal and
        # wait for it return.
        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
            signal.alarm(1)
            signal.pause()
            signal.alarm(0)

        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
                           thread.get_ident())
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
                           thread.get_ident())
        signalled_all.release()
test_multiprocessing.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_poll_eintr(self):
        got_signal = [False]
        def record(*args):
            got_signal[0] = True
        pid = os.getpid()
        oldhandler = signal.signal(signal.SIGUSR1, record)
        try:
            killer = self.Process(target=self._killer, args=(pid,))
            killer.start()
            p = self.Process(target=time.sleep, args=(1,))
            p.start()
            p.join()
            self.assertTrue(got_signal[0])
            self.assertEqual(p.exitcode, 0)
            killer.join()
        finally:
            signal.signal(signal.SIGUSR1, oldhandler)

#
# Test to verify handle verification, see issue 3321
#
test_multiprocessing.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_ignore(self):
        conn, child_conn = multiprocessing.Pipe()
        try:
            p = multiprocessing.Process(target=self._test_ignore,
                                        args=(child_conn,))
            p.daemon = True
            p.start()
            child_conn.close()
            self.assertEqual(conn.recv(), 'ready')
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            time.sleep(0.1)
            conn.send(1234)
            self.assertEqual(conn.recv(), 1234)
            time.sleep(0.1)
            os.kill(p.pid, signal.SIGUSR1)
            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
            time.sleep(0.1)
            p.join()
        finally:
            conn.close()
test_signal.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_sigpending(self):
        code = """if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        """
        assert_python_ok('-c', code)


问题


面经


文章

微信
公众号

扫码关注公众号