python类signal()的实例源码

test_manager.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_worker_alarm(manager):
    called = []
    def handler(signal, frame):
        called.append(True)
    signal.signal(signal.SIGALRM, handler)

    @manager.task
    def foo(sleep):
        time.sleep(sleep)

    w = Worker(manager, task_timeout=1)
    w.process_one(make_task('foo', args=(0.1,)))
    assert not called

    w.process_one(make_task('foo', args=(1.1,)))
    assert called
timeout.py 文件源码 项目:heerkat 作者: Roche 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def timeout(timeout_seconds):
    def decorate(function):
        message = "Timeout (%s sec) elapsed for test %s" % (timeout_seconds, function.__name__)

        def handler(signum, frame):
            raise TimeoutError(message)

        def new_f(*args, **kwargs):
            old = signal.signal(signal.SIGALRM, handler)
            signal.alarm(timeout_seconds)
            try:
                function_result = function(*args, **kwargs)
            finally:
                signal.signal(signal.SIGALRM, old)
            signal.alarm(0)
            return function_result

        new_f.func_name = function.func_name
        return new_f

    return decorate
timeout.py 文件源码 项目:open-nti 作者: Juniper 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
workflow.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
        """Trap ``SIGTERM`` and call wrapped function."""
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
worker.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process(self, queue_list, burst=False):  # pragma: no cover
        signal.signal(signal.SIGALRM, self.alarm_handler)

        run = RunFlag()
        start = time()
        while run:
            task = self.manager.pop(queue_list, 1)
            if task:
                try:
                    self.process_one(task)
                except StopWorker:
                    break
            elif burst:
                break

            if self.lifetime and time() - start > self.lifetime:
                break

        self.manager.close()
libmilter.py 文件源码 项目:sipxecs-voicemail-transcription 作者: andrewsauder 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test():
    import signal
    t = AsyncFactory('inet:127.0.0.1:5000' , MilterProtocol)
    def sigHandler(num , frame):
        t.close()
        sys.exit(0)
    signal.signal(signal.SIGINT , sigHandler)
    t.run()
# }}}
main.py 文件源码 项目:slack-status-bar 作者: ericwb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    # Read the configuration file
    config_file = os.path.join(rumps.application_support(APP_TITLE),
                               'config.yaml')
    with open(config_file, 'r') as conf:
        try:
            config = yaml.safe_load(conf)
        except yaml.YAMLError as exc:
            print(exc)
            return

    # Setup our CTRL+C signal handler
    signal.signal(signal.SIGINT, _signal_handler)

    # Enable debug mode
    rumps.debug_mode(config['debug'])

    # Startup application
    SlackStatusBarApp(config).run()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self, handler):
        import asyncio
        from aiohttp.wsgi import WSGIServerHttpProtocol
        self.loop = self.get_event_loop()
        asyncio.set_event_loop(self.loop)

        protocol_factory = lambda: WSGIServerHttpProtocol(
            handler,
            readpayload=True,
            debug=(not self.quiet))
        self.loop.run_until_complete(self.loop.create_server(protocol_factory,
                                                             self.host,
                                                             self.port))

        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: self.loop.stop())

        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            self.loop.stop()
server.py 文件源码 项目:spoon 作者: SpamExperts 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, address):
        self.log = logging.getLogger(self.server_logger)
        self.socket = None
        if ":" in address[0]:
            self.address_family = socket.AF_INET6
        else:
            self.address_family = socket.AF_INET
        self.log.debug("Listening on %s", address)

        super(_SpoonMixIn, self).__init__(address, self.handler_klass,
                                          bind_and_activate=False)
        self.load_config()
        self._setup_socket()

        # Finally, set signals
        if self.signal_reload is not None:
            signal.signal(self.signal_reload, self.reload_handler)
        if self.signal_shutdown is not None:
            signal.signal(self.signal_shutdown, self.shutdown_handler)
jobs.py 文件源码 项目:jobs 作者: josiahcarlson 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def stop(self, failed=False, shutting_down=False):
        '''
        Stops a job if running. If the optional "failed" argument is true,
        outputs will not be set as available.
        '''
        if self.is_running:
            with self._lock:
                if not self.is_running:
                    # another thread could have changed the status
                    return
                failed = bool(failed)
                DEFAULT_LOGGER.info("Stopping job failed = %r", bool(failed))
                if shutting_down:
                    DEFAULT_LOGGER.warning("Stopping job as part of atexit/signal handler exit")
                try:
                    _finish_job(self.conn, self.inputs, self.outputs, self.identifier,
                        failed=failed)
                finally:
                    self.last_refreshed = None
                    self.auto_refresh = None
                    LOCKED.discard(self)
                    AUTO_REFRESH.discard(self)
jobs.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def handle(self, *args, **kwargs):

        logger.info("%s - starting jobs schedule" % (__name__))

        try:
            '''            
            schedule.every().hour.do(job_update_entities)
            schedule.every().hour.do(job_update_clients)
            schedule.every().hour.do(job_update_checks)
            schedule.every().hour.do(job_update_trends)
            #schedule.every(10).minutes.do(job_update_events)
            '''

            schedule.every(settings.CACHE_ENTITY_TTL).seconds.do(job_update_entities)
            schedule.every(settings.CACHE_CLIENT_TTL).seconds.do(job_update_clients)
            schedule.every(settings.CACHE_TRENDS_TTL).seconds.do(job_update_trends)


            while True:
                schedule.run_pending()
                sleep(1)

        except KeyboardInterrupt:
            logger.info("%s - user signal exit!" % (__name__))
            exit(0)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def per_process_init():
    # type: () -> None
    try:
        os.nice(19)
    except AttributeError:
        # nice is not available everywhere.
        pass
    except OSError:
        # When this program is already running on the nicest level (20) on OS X
        # it is not permitted to change the priority.
        pass
    # A keyboard interrupt disrupts the communication between a
    # Python script and its subprocesses when using multiprocessing.
    # The child can ignore SIGINT and is properly shut down
    # by a pool.terminate() call in case of a keyboard interrupt
    # or an early generator exit.
    signal.signal(signal.SIGINT, signal.SIG_IGN)
cpu_reporter.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setup(self):
        if self.agent.get_option('cpu_profiler_disabled'):
            return

        if runtime_info.OS_WIN:
            self.agent.log('CPU profiler is not available on Windows.')
            return

        def _sample(signum, signal_frame):
            if self.handler_active:
                return
            self.handler_active = True

            with self.profile_lock:
                try:
                    self.process_sample(signal_frame)
                    signal_frame = None
                except Exception:
                    self.agent.exception()

            self.handler_active = False

        self.prev_signal_handler = signal.signal(signal.SIGPROF, _sample)

        self.setup_done = True
runtime.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register_signal(signal_number, handler_func, once = False):
    prev_handler = None

    def _handler(signum, frame):
        skip_prev = handler_func(signum, frame)

        if not skip_prev:
            if callable(prev_handler):
                if once:
                    signal.signal(signum, prev_handler)
                prev_handler(signum, frame)
            elif prev_handler == signal.SIG_DFL and once:
                signal.signal(signum, signal.SIG_DFL)
                os.kill(os.getpid(), signum)

    prev_handler = signal.signal(signal_number, _handler)
runtime_test.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_register_signal(self):
        if runtime_info.OS_WIN:
            return

        result = {'handler': 0}

        def _handler(signum, frame):
            result['handler'] += 1

        register_signal(signal.SIGUSR1, _handler)

        os.kill(os.getpid(), signal.SIGUSR1)
        os.kill(os.getpid(), signal.SIGUSR1)

        signal.signal(signal.SIGUSR1, signal.SIG_DFL)

        self.assertEqual(result['handler'], 2)
log.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set_debug(self, state):
        "Turn debugging on or off, remembering the last-set level"

        if state == self.forced_debug:
            self.debug("Debug signal ignored; already %sdebugging",
                       "" if state else "not ")

        if state:
            self.level(DEBUG, save=False)
            self.debug("Debug started")
        else:
            self.debug("Debug discontinued")
            self.level(self.last_level)

        self.forced_debug = state
        self.__update_env()
exit.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __exit_handler(signum, frame):
    """
    Let the worker go.  This doesn't do much of anything because it's
    called from inside a signal handler.
    """
    #print "EH START"
    with this.lock:
        exit_barrier = this.exit_barrier

    if exit_barrier is not None:
        # Meet up with the worker
        this.exit_barrier.wait()
        #print "EH FIRST BARRIER"
        # Wait for the worker to be done
        this.finish_barrier.wait()
        #print "EH HANDLER FINISHED"
    #print "EH DONE"
    sys.exit(0)
__main__.py 文件源码 项目:opendxl-maxmind-service-python 作者: opendxl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def signal_handler(signum, frame):
    """
    Signal handler invoked when registered signals are triggered

    :param signum: The signal number
    :param frame: The frame
    """
    del signum, frame
    global running, run_condition
    with run_condition:
        if running:
            # Stop the application
            running = False
            run_condition.notify()
        else:
            exit(1)

# Signals to register for
test.py 文件源码 项目:anonymine 作者: oskar-skog 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def bug1():
    # BUG:
    #       sys.stdin.readline() in `ask` dies with IOError and
    #       errno=EINTR when the terminal gets resized after curses has
    #       been de-initialized.
    #       1: SIGWINCH is sent by the terminal when the screen has been
    #           resized.
    #       2: curses forgot to restore the signal handling of SIGWINCH
    #           to the default of ignoring the signal.
    #           NOTE: signal.getsignal(signal.SIGWINCH) incorrectly
    #               returns signal.SIG_DFL. (Default is to be ignored.)
    #       3: Python fails to handle EINTR when reading from stdin.
    # REFERENCES:
    #       Issue 3949: https://bugs.python.org/issue3949
    #       PEP 0457: https://www.python.org/dev/peps/pep-0475/
    def handler(foo, bar):
        print("Resized")
    signal.signal(signal.SIGWINCH, handler)
    while True:
        sys.stdin.readline()
arbitrer.py 文件源码 项目:bitcoin-arbitrage 作者: ucfyao 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loop(self):
        #
        signal.signal(signal.SIGINT, sigint_handler)

        #?????windows python2.4???,??freebsd???
        signal.signal(signal.SIGHUP, sigint_handler)

        signal.signal(signal.SIGTERM, sigint_handler)

        while True:
            self.depths = self.update_depths()
            # print(self.depths)
            self.tickers()
            self.tick()
            time.sleep(config.refresh_rate)

            if is_sigint_up:
                # ??????????
                self.terminate()
                print ("Exit")
                break
workflow.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
workflow.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
        self._caught_signal = None
        # Register handler for SIGTERM, then call `self.func`
        self.old_signal_handler = signal.getsignal(signal.SIGTERM)
        signal.signal(signal.SIGTERM, self.signal_handler)

        self.func(*args, **kwargs)

        # Restore old signal handler
        signal.signal(signal.SIGTERM, self.old_signal_handler)

        # Handle any signal caught during execution
        if self._caught_signal is not None:
            signum, frame = self._caught_signal
            if callable(self.old_signal_handler):
                self.old_signal_handler(signum, frame)
            elif self.old_signal_handler == signal.SIG_DFL:
                sys.exit(0)
cli.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def web_server(wiki, port, debug=False):

    def kill_handler(signal_number, stack_frame):
        logger.info('\nStopping wiki')
        sys.exit(1)

    signal.signal(signal.SIGINT, kill_handler)

    logger.info('Starting wiki on port {}. Ctrl+C will kill it.'.format(port))
    HTTPServer(WSGIContainer(wiki)).listen(port)
    ioloop = IOLoop.instance()

    if debug:
        autoreload.start(ioloop)

    ioloop.start()
_reloader.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run_with_reloader(main_func, extra_files=None, interval=1,
                      reloader_type='auto'):
    """Run the given function in an independent python interpreter."""
    import signal
    reloader = reloader_loops[reloader_type](extra_files, interval)
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
    try:
        if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
            t = threading.Thread(target=main_func, args=())
            t.setDaemon(True)
            t.start()
            reloader.run()
        else:
            sys.exit(reloader.restart_with_reloader())
    except KeyboardInterrupt:
        pass
recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_all_exit_sigs_with_sig(self):
            # Same as above but the process is terminated by a signal
            # instead of exiting cleanly.
            for sig in TEST_SIGNALS:
                ret = pyrun(textwrap.dedent(
                    """
                    import functools, os, signal, imp
                    mod = imp.load_source("mod", r"{modname}")

                    def foo(s):
                        with open(r"{testfn}", "ab") as f:
                            f.write(s)

                    signal.signal({sig}, functools.partial(foo, b'0'))
                    mod.register_exit_fun(functools.partial(foo, b'1'))
                    mod.register_exit_fun(functools.partial(foo, b'2'))
                    os.kill(os.getpid(), {sig})
                    """.format(modname=os.path.abspath(__file__),
                               testfn=TESTFN, sig=sig)
                ))
                self.assertEqual(ret, sig)
                with open(TESTFN, "rb") as f:
                    self.assertEqual(f.read(), b"210")
                safe_remove(TESTFN)
__init__.py 文件源码 项目:jack-matchmaker 作者: SpotlightKid 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, patterns, pattern_file=None, name="jack-matchmaker", connect_interval=3.0,
                 connect_maxattempts=0):
        self.patterns = []
        self.pattern_file = pattern_file
        self.connect_maxattempts = connect_maxattempts
        self.connect_interval = connect_interval

        if self.pattern_file:
            self.add_patterns_from_file(self.pattern_file)

            if not sys.platform.startswith('win'):
                signal.signal(signal.SIGHUP, self.reread_pattern_file)
            else:
                log.warning("Signal handling not supported on Windows. jack-matchmaker must be "
                            "restarted to re-read the pattern file.")

        for pair in patterns:
            self.add_patterns(*pair)

        self.queue = queue.Queue()
        self.client = None
util.py 文件源码 项目:AI-Pacman 作者: AUTBS 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __call__(self, *args, **keyArgs):
        # If we have SIGALRM signal, use it to cause an exception if and
        # when this function runs too long.  Otherwise check the time taken
        # after the method has returned, and throw an exception then.
        if hasattr(signal, 'SIGALRM'):
            old = signal.signal(signal.SIGALRM, self.handle_timeout)
            signal.alarm(self.timeout)
            try:
                result = self.function(*args, **keyArgs)
            finally:
                signal.signal(signal.SIGALRM, old)
            signal.alarm(0)
        else:
            startTime = time.time()
            result = self.function(*args, **keyArgs)
            timeElapsed = time.time() - startTime
            if timeElapsed >= self.timeout:
                self.handle_timeout(None, None)
        return result
_reloader.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def run_with_reloader(main_func, extra_files=None, interval=1,
                      reloader_type='auto'):
    """Run the given function in an independent python interpreter."""
    import signal
    reloader = reloader_loops[reloader_type](extra_files, interval)
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
    try:
        if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
            t = threading.Thread(target=main_func, args=())
            t.setDaemon(True)
            t.start()
            reloader.run()
        else:
            sys.exit(reloader.restart_with_reloader())
    except KeyboardInterrupt:
        pass
logger.py 文件源码 项目:muxrplot 作者: sirmo 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def exit_gracefully(signum, frame):
    global original_sigint
    # restore the original signal handler as otherwise evil things will happen
    # in raw_input when CTRL+C is pressed, and our signal handler is not re-entrant
    signal.signal(signal.SIGINT, original_sigint)

    try:
        if raw_input("\nReally quit? (y/n)> ").lower().startswith('y'):
            sys.exit(1)

    except KeyboardInterrupt:
        print("Ok ok, quitting")
        sys.exit(1)

    # restore the exit gracefully handler here
    signal.signal(signal.SIGINT, exit_gracefully)
process.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def initialize(cls, io_loop=None):
        """Initializes the ``SIGCHLD`` handler.

        The signal handler is run on an `.IOLoop` to avoid locking issues.
        Note that the `.IOLoop` used for signal handling need not be the
        same one used by individual Subprocess objects (as long as the
        ``IOLoops`` are each running in separate threads).

        .. versionchanged:: 4.1
           The ``io_loop`` argument is deprecated.
        """
        if cls._initialized:
            return
        if io_loop is None:
            io_loop = ioloop.IOLoop.current()
        cls._old_sigchld = signal.signal(
            signal.SIGCHLD,
            lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup))
        cls._initialized = True


问题


面经


文章

微信
公众号

扫码关注公众号