python类PeriodicCallback()的实例源码

sdserver.py 文件源码 项目:slidoc 作者: mitotic 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setup_backup():
    if not Options['backup_hhmm']:
        return
    curTimeSec = sliauth.epoch_ms()/1000.0
    curDate = sliauth.iso_date(sliauth.create_date(curTimeSec*1000.0))[:10]
    backupTimeSec = sliauth.epoch_ms(sliauth.parse_date(curDate+'T'+Options['backup_hhmm']))/1000.0
    backupInterval = 86400
    if curTimeSec+60 > backupTimeSec:
        backupTimeSec += backupInterval
    print >> sys.stderr, Options['site_name'] or 'ROOT', 'Scheduled daily backup in dir %s, starting at %s' % (Options['backup_dir'], sliauth.iso_date(sliauth.create_date(backupTimeSec*1000.0)))
    def start_backup():
        if Options['debug']:
            print >> sys.stderr, "Starting periodic backup"
        backupSite()
        Global.backup = PeriodicCallback(backupSite, backupInterval*1000.0)
        Global.backup.start()

    IOLoop.current().call_at(backupTimeSec, start_backup)
autoreload.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
autoreload.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
autoreload.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
microservice.py 文件源码 项目:gemstone 作者: vladcalin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _periodic_task_iter(self):
        """
        Iterates through all the periodic tasks:

        - the service registry pinging
        - default dummy task if on Windows
        - user defined periodic tasks

        :return:
        """
        for strategy in self.discovery_strategies:
            self.default_periodic_tasks.append(
                (functools.partial(strategy.ping, self.name, self.accessible_at),
                 self.service_registry_ping_interval)
            )
            self.default_periodic_tasks[-1][0]()

        all_periodic_tasks = self.default_periodic_tasks + self.periodic_tasks
        for func, timer_in_seconds in all_periodic_tasks:
            timer_milisec = timer_in_seconds * 1000
            yield PeriodicCallback(func, timer_milisec, io_loop=self.io_loop)
system.py 文件源码 项目:lampost_lib 作者: genzgd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _start_heartbeat():
    global pulse_lc, maintenance_lc
    pulse_interval = config_value('pulse_interval')
    maintenance_interval = config_value('maintenance_interval')
    dispatcher.pulses_per_second = 1 / pulse_interval
    if pulse_lc:
        pulse_lc.stop()
    pulse_lc = PeriodicCallback(dispatcher.pulse, pulse_interval * 1000)
    pulse_lc.start()
    log.info("Pulse Event heartbeat started at {} seconds", pulse_interval)

    if maintenance_lc:
        maintenance_lc.stop()
    maintenance_lc = PeriodicCallback(lambda: dispatcher.dispatch('maintenance'), 60 * maintenance_interval * 1000)
    maintenance_lc.start()
    log.info("Maintenance Event heartbeat started at {} minutes", maintenance_interval)
irc.py 文件源码 项目:labots 作者: SilverRainZ 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, host, port, nick,
            relaybots = [],
            charset = 'utf-8',
            ioloop = False):
        logger.info('Connecting to %s:%s', host, port)

        self.host = host
        self.port = port
        self.nick = nick
        self.relaybots = relaybots

        self._charset = charset
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._ioloop = ioloop or IOLoop.instance()
        self._stream = IOStream(sock, io_loop = self._ioloop)
        self._stream.connect((host, port), self._login)

        self._last_pong = time.time()
        self._timer = PeriodicCallback(self._keep_alive,
                60 * 1000, io_loop=self._ioloop)
        self._timer.start()

        self._send_timer = PeriodicCallback(self._sock_send,
                600, io_loop=self._ioloop)
        self._send_timer.start()
termio.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _handle_match(self, pattern_obj, match):
        """
        Handles a matched regex detected by :meth:`postprocess`.  It calls
        :obj:`Pattern.callback` and takes care of removing it from
        :attr:`_patterns` (if it isn't sticky).
        """
        if self._handling_match:
            # Don't process anything if we're in the middle of handling a match.
            # NOTE: This can happen when there's more than one thread,
            # processes, or PeriodicCallback going on simultaneously.  It seems
            # to work better than threading.Lock()
            return
        self._handling_match = True
        callback = partial(pattern_obj.callback, self, match.group())
        self._call_callback(callback)
        if self.debug:
            # Turn on the fancy regex debugger/pretty printer
            debug_callback = partial(
                debug_expect, self, match.group(), pattern_obj.pattern)
            self._call_callback(debug_callback)
        if not pattern_obj.sticky:
            self.unexpect(hash(pattern_obj)) # Remove it
        self._handling_match = False
termio.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        super(MultiplexPOSIXIOLoop, self).__init__(*args, **kwargs)
        from tornado import ioloop
        self.terminating = False
        self.sent_sigint = False
        self.shell_command = ['/bin/sh', '-c']
        self.use_shell = True # Controls whether or not we wrap with the above
        self.env = {}
        self.io_loop = ioloop.IOLoop.current() # Monitors child for activity
        #self.io_loop.set_blocking_signal_threshold(2, self._blocked_io_handler)
        #signal.signal(signal.SIGALRM, self._blocked_io_handler)
        self.reenable_timeout = None
        interval = 100 # A 0.1 second interval should be fast enough
        self.scheduler = ioloop.PeriodicCallback(self._timeout_checker,interval)
        self.exitstatus = None
        self._checking_patterns = False
        self.read_timeout = datetime.now()
        self.capture_limit = -1 # Huge reads by default
        self.restore_rate = None
termio.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _timeout_checker(self):
        """
        Runs `timeout_check` and if there are no more non-sticky
        patterns in :attr:`self._patterns`, stops :attr:`scheduler`.
        """
        if not self._checking_patterns:
            self._checking_patterns = True
            remaining_patterns = self.timeout_check()
            if not remaining_patterns:
                # No reason to keep the PeriodicCallback going
                logging.debug("Stopping self.scheduler (no remaining patterns)")
                try:
                    self.scheduler.stop()
                except AttributeError:
                # Now this is a neat trick:  The way IOLoop works with its
                # stack_context thingamabob the scheduler doesn't actualy end up
                # inside the MultiplexPOSIXIOLoop instance inside of this
                # instance of _timeout_checker() *except* inside the main
                # thread.  It is absolutely wacky but it works and works well :)
                    pass
            self._checking_patterns = False
async.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, callback, callback_time, io_loop=None):
        self.callback = callback
        self.callback_time = callback_time
        self.io_loop = io_loop or IOLoop.current()
        if self.io_loop._running:
            # Use a regular PeriodicCallback
            self._pc = PC(callback, callback_time, io_loop)
        else:
            from threading import Timer
            # NOTE: PeriodicCallback uses ms while Timer uses seconds
            def callback_wrapper():
                "Runs the callback and restarts the Timer so it will run again"
                self.callback()
                self._pc = Timer(callback_time / 1000, callback_wrapper)
                if self._running:
                    self._pc.start()
            self._pc = Timer(callback_time / 1000, callback_wrapper)
        self._running = False
autoreload.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
search.py 文件源码 项目:librarian 作者: HERA-Team 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def register_standing_order_checkin():
    """Create a Tornado PeriodicCallback that will periodically check the
    standing orders to see if there's anything to do.

    Since we know all events related to files, in theory this shouldn't be
    needed, but in practice this can't hurt.

    The timeout for the callback is measured in milliseconds, so we queue an
    evaluation every 10 minutes.

    """
    from tornado import ioloop

    cb = ioloop.PeriodicCallback(queue_standing_order_copies, 60 * 10 * 1000)
    cb.start()
    return cb


# The local-disk staging system for the NRAO Librarian. In a sense this code
# isn't super relevant to searches, but the search system is how it gets
# launched, and it's not obvious to me that there's a better place to put it.
autoreload.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
autoreload.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
stats.py 文件源码 项目:get_started_with_respeaker 作者: respeaker 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __init__(self, io_loop):
        # Sessions
        self.sess_active = 0

        # Avoid circular reference
        self.sess_transports = dict()

        # Connections
        self.conn_active = 0
        self.conn_ps = MovingAverage()

        # Packets
        self.pack_sent_ps = MovingAverage()
        self.pack_recv_ps = MovingAverage()

        self._callback = ioloop.PeriodicCallback(self._update,
                                                 1000,
                                                 io_loop)
        self._callback.start()
autoreload.py 文件源码 项目:teleport 作者: eomsoft 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
autoreload.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
autoreload.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
autoreload.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
autoreload.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()
ioloop_test.py 文件源码 项目:browser_vuln_check 作者: lcatro 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_overrun(self):
        sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
        expected = [
            1010, 1020, 1030,  # first 3 calls on schedule
            1050, 1070,  # next 2 delayed one cycle
            1100, 1130,  # next 2 delayed 2 cycles
            1170, 1210,  # next 2 delayed 3 cycles
            1220, 1230,  # then back on schedule.
        ]
        calls = []

        def cb():
            calls.append(self.io_loop.time())
            if not sleep_durations:
                self.io_loop.stop()
                return
            self.io_loop.sleep(sleep_durations.pop(0))
        pc = PeriodicCallback(cb, 10000)
        pc.start()
        self.io_loop.start()
        self.assertEqual(calls, expected)
autoreload.py 文件源码 项目:TornadoWeb 作者: VxCoder 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def start(io_loop=None, check_time=500):
    """Begins watching source files for changes.

    .. versionchanged:: 4.1
       The ``io_loop`` argument is deprecated.
    """
    io_loop = io_loop or ioloop.IOLoop.current()
    if io_loop in _io_loops:
        return
    _io_loops[io_loop] = True
    if len(_io_loops) > 1:
        gen_log.warning("tornado.autoreload started more than once in the same process")
    if _has_execv:
        add_reload_hook(functools.partial(io_loop.close, all_fds=True))
    modify_times = {}
    callback = functools.partial(_reload_on_update, modify_times)
    scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
    scheduler.start()


问题


面经


文章

微信
公众号

扫码关注公众号