python类run()的实例源码

launch_crawler.py 文件源码 项目:ArticlePusher 作者: aforwardz 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def launch_crawlers(crawler_class, exclusion=None):
    settings = get_settings()
    configure_logging(settings=settings)
    launcher = CrawlerRunner(settings)
    crawlers = launcher.spider_loader.list()
    crawlers = list([c for c in crawlers if c.__contains__(crawler_class)])
    if exclusion:
        for c in settings.get(exclusion, []):
            crawlers.remove(c)

    try:
        for crawler in crawlers:
            launcher.crawl(crawler)
        d = launcher.join()
        d.addBoth(lambda _: reactor.stop())
        reactor.run()
        return True
    except Exception as e:
        launch_logger.error('(????)????? | ?????:\n{excep}'
                            .format(excep=e))
        return False
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self, handler):  # pragma: no cover
        import fapws._evwsgi as evwsgi
        from fapws import base, config
        port = self.port
        if float(config.SERVER_IDENT[-2:]) > 0.4:
            # fapws3 silently changed its API in 0.5
            port = str(port)
        evwsgi.start(self.host, port)
        # fapws3 never releases the GIL. Complain upstream. I tried. No luck.
        if 'BOTTLE_CHILD' in os.environ and not self.quiet:
            _stderr("WARNING: Auto-reloading does not work with Fapws3.\n")
            _stderr("         (Fapws3 breaks python thread support)\n")
        evwsgi.set_base_module(base)

        def app(environ, start_response):
            environ['wsgi.multiprocess'] = False
            return handler(environ, start_response)

        evwsgi.wsgi_cb(('', app))
        evwsgi.run()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self, handler):
        from eventlet import wsgi, listen, patcher
        if not patcher.is_monkey_patched(os):
            msg = "Bottle requires eventlet.monkey_patch() (before import)"
            raise RuntimeError(msg)
        socket_args = {}
        for arg in ('backlog', 'family'):
            try:
                socket_args[arg] = self.options.pop(arg)
            except KeyError:
                pass
        address = (self.host, self.port)
        try:
            wsgi.server(listen(address, **socket_args), handler,
                        log_output=(not self.quiet))
        except TypeError:
            # Fallback, if we have old version of eventlet
            wsgi.server(listen(address), handler)
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self, handler):
        import asyncio
        from aiohttp.wsgi import WSGIServerHttpProtocol
        self.loop = self.get_event_loop()
        asyncio.set_event_loop(self.loop)

        protocol_factory = lambda: WSGIServerHttpProtocol(
            handler,
            readpayload=True,
            debug=(not self.quiet))
        self.loop.run_until_complete(self.loop.create_server(protocol_factory,
                                                             self.host,
                                                             self.port))

        if 'BOTTLE_CHILD' in os.environ:
            import signal
            signal.signal(signal.SIGINT, lambda s, f: self.loop.stop())

        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            self.loop.stop()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        exists = os.path.exists
        mtime = lambda p: os.stat(p).st_mtime
        files = dict()

        for module in list(sys.modules.values()):
            path = getattr(module, '__file__', '')
            if path[-4:] in ('.pyo', '.pyc'): path = path[:-1]
            if path and exists(path): files[path] = mtime(path)

        while not self.status:
            if not exists(self.lockfile)\
            or mtime(self.lockfile) < time.time() - self.interval - 5:
                self.status = 'error'
                thread.interrupt_main()
            for path, lmtime in list(files.items()):
                if not exists(path) or mtime(path) > lmtime:
                    self.status = 'reload'
                    thread.interrupt_main()
                    break
            time.sleep(self.interval)
twistedreactor.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        """
        Initialization method.

        Note that we can't call reactor methods directly here because
        it's not thread-safe, so we schedule the reactor/connection
        stuff to be run from the event loop thread when it gets the
        chance.
        """
        Connection.__init__(self, *args, **kwargs)

        self.is_closed = True
        self.connector = None

        reactor.callFromThread(self.add_connection)
        self._loop.maybe_start()
server.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def render_GET(self, request):
        request.responseHeaders.setRawHeaders(b"Content-Type", ["text/html"])

        data = bytes()
        path = []

        path.append(os.path.dirname(__file__))

        # If we're being run from the build/ directory of a source or git tree,
        # append the full path:
        if not os.path.isabs(path[0]):
            path.insert(0, os.getcwd())
        # If we're being run as part of some unittests, get rid of the test dir:
        if path[0].endswith("_trial_temp"):
            path[0] = path[0].rsplit("_trial_temp")[0]

        path.append('API.html')
        spec = os.path.sep.join(path)

        with open(spec) as fh:
            data += bytes(fh.read())

        return data
server.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    log.startLogging(sys.stdout)

    captchaKey = crypto.getKey(FARFETCHD_CAPTCHA_HMAC_KEYFILE)
    hmacKey = crypto.getHMAC(captchaKey, "Captcha-Key")

    # Load or create our encryption keys:
    secretKey, publicKey = crypto.getRSAKey(FARFETCHD_CAPTCHA_RSA_KEYFILE)

    index = CaptchaResource()
    fetch = CaptchaFetchResource(hmacKey, publicKey, secretKey)
    check = CaptchaCheckResource(hmacKey, publicKey, secretKey)

    root = index
    root.putChild("fetch", fetch)
    root.putChild("check", check)

    site = HttpJsonApiServer(root)
    port = FARFETCHD_HTTP_PORT or 80
    host = FARFETCHD_HTTP_HOST or '127.0.0.1'

    reactor.listenTCP(port, site, interface=host)
    reactor.run()
tally_server.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        '''
        Called by twisted
        '''
        # load initial config
        self.refresh_config()
        if self.config is None:
            logging.critical("cannot start due to error in config file")
            return

        # refresh and check status every event_period seconds
        self.refresh_task = task.LoopingCall(self.refresh_loop)
        refresh_deferred = self.refresh_task.start(self.config['event_period'], now=False)
        refresh_deferred.addErrback(errorCallback)

        # setup server for receiving blinded counts from the DC nodes and key shares from the SK nodes
        listen_port = self.config['listen_port']
        key_path = self.config['key']
        cert_path = self.config['cert']
        ssl_context = ssl.DefaultOpenSSLContextFactory(key_path, cert_path)

        logging.info("Tally Server listening on port {}".format(listen_port))
        reactor.listenSSL(listen_port, self, ssl_context)
        reactor.run()
inject.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run_inject(args):
    '''
    start the injector, and start it listening
    '''
    # pylint: disable=E1101
    injector = PrivCountDataInjector(args.log, args.simulate, float(args.prune_before), float(args.prune_after), args.control_password, args.control_cookie_file)
    # The injector listens on all of IPv4, IPv6, and a control socket, and
    # injects events into the first client to connect
    # Since these are synthetic events, it is safe to use /tmp for the socket
    # path
    # XXX multiple connections to our server will kill old connections
    listener_config = {}
    if args.port is not None:
        listener_config['port'] = args.port
        if args.ip is not None:
            listener_config['ip'] = args.ip
    if args.unix is not None:
        listener_config['unix']= args.unix
    listeners = listen(injector, listener_config, ip_version_default = [4, 6])
    injector.set_listeners(listeners)
    reactor.run()
twisted.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, io_loop=None):
        if not io_loop:
            io_loop = tornado.ioloop.IOLoop.current()
        self._io_loop = io_loop
        self._readers = {}  # map of reader objects to fd
        self._writers = {}  # map of writer objects to fd
        self._fds = {}  # a map of fd to a (reader, writer) tuple
        self._delayedCalls = {}
        PosixReactorBase.__init__(self)
        self.addSystemEventTrigger('during', 'shutdown', self.crash)

        # IOLoop.start() bypasses some of the reactor initialization.
        # Fire off the necessary events if they weren't already triggered
        # by reactor.run().
        def start_if_necessary():
            if not self._started:
                self.fireSystemEvent('startup')
        self._io_loop.add_callback(start_if_necessary)

    # IReactorTime
twisted.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, io_loop=None):
        if not io_loop:
            io_loop = tornado.ioloop.IOLoop.current()
        self._io_loop = io_loop
        self._readers = {}  # map of reader objects to fd
        self._writers = {}  # map of writer objects to fd
        self._fds = {}  # a map of fd to a (reader, writer) tuple
        self._delayedCalls = {}
        PosixReactorBase.__init__(self)
        self.addSystemEventTrigger('during', 'shutdown', self.crash)

        # IOLoop.start() bypasses some of the reactor initialization.
        # Fire off the necessary events if they weren't already triggered
        # by reactor.run().
        def start_if_necessary():
            if not self._started:
                self.fireSystemEvent('startup')
        self._io_loop.add_callback(start_if_necessary)

    # IReactorTime
twisted.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, io_loop=None):
        if not io_loop:
            io_loop = tornado.ioloop.IOLoop.current()
        self._io_loop = io_loop
        self._readers = {}  # map of reader objects to fd
        self._writers = {}  # map of writer objects to fd
        self._fds = {}  # a map of fd to a (reader, writer) tuple
        self._delayedCalls = {}
        PosixReactorBase.__init__(self)
        self.addSystemEventTrigger('during', 'shutdown', self.crash)

        # IOLoop.start() bypasses some of the reactor initialization.
        # Fire off the necessary events if they weren't already triggered
        # by reactor.run().
        def start_if_necessary():
            if not self._started:
                self.fireSystemEvent('startup')
        self._io_loop.add_callback(start_if_necessary)

    # IReactorTime
sslstrip.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main(argv):
    (logFile, logLevel, listenPort, spoofFavicon, killSessions) = parseOptions(argv)

    logging.basicConfig(level=logLevel, format='%(asctime)s %(message)s',
                        filename=logFile, filemode='w')

    URLMonitor.getInstance().setFaviconSpoofing(spoofFavicon)
    CookieCleaner.getInstance().setEnabled(killSessions)

    strippingFactory              = http.HTTPFactory(timeout=10)
    strippingFactory.protocol     = StrippingProxy

    reactor.listenTCP(int(listenPort), strippingFactory)

    print "\nsslstrip " + gVersion + " by Moxie Marlinspike running..."
    print "+ POC by Leonardo Nve"

    reactor.run()
manhole.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run():
    config = MyOptions()
    try:
        config.parseOptions()
    except usage.UsageError, e:
        print str(e)
        print str(config)
        sys.exit(1)

    try:
        run = getattr(sys.modules[__name__], 'run_' + config.opts['toolkit'])
    except AttributeError:
        print "Sorry, no support for toolkit %r." % (config.opts['toolkit'],)
        sys.exit(1)

    run(config)

    from twisted.internet import reactor
    reactor.run()
tkunzip.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def run(argv=sys.argv):
    log.startLogging(file('tkunzip.log', 'w'))
    opt=TkunzipOptions()
    try:
        opt.parseOptions(argv[1:])
    except usage.UsageError, e:
        print str(opt)
        print str(e)
        sys.exit(1)

    if opt['use-console']:
        # this should come before shell-exec to prevent infinite loop
        return doItConsolicious(opt)              
    if opt['shell-exec'] or not 'Tkinter' in sys.modules:
        from distutils import sysconfig
        from twisted.scripts import tkunzip
        myfile=tkunzip.__file__
        exe=os.path.join(sysconfig.get_config_var('prefix'), 'python.exe')
        return os.system('%s %s --use-console %s' % (exe, myfile,
                                                     ' '.join(argv[1:])))
    return doItTkinterly(opt)
app.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def runReactorWithLogging(config, oldstdout, oldstderr):
    from twisted.internet import reactor
    try:
        if config['profile']:
            if not config['nothotshot']:
                runWithHotshot(reactor, config)
            else:
                runWithProfiler(reactor, config)
        elif config['debug']:
            sys.stdout = oldstdout
            sys.stderr = oldstderr
            if runtime.platformType == 'posix':
                signal.signal(signal.SIGUSR2, lambda *args: pdb.set_trace())
                signal.signal(signal.SIGINT, lambda *args: pdb.set_trace())
            fixPdb()
            pdb.runcall(reactor.run)
        else:
            reactor.run()
    except:
        if config['nodaemon']:
            file = oldstdout
        else:
            file = open("TWISTD-CRASH.log",'a')
        traceback.print_exc(file=file)
        file.flush()
cftp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run():
#    import hotshot
#    prof = hotshot.Profile('cftp.prof')
#    prof.start()
    args = sys.argv[1:]
    if '-l' in args: # cvs is an idiot
        i = args.index('-l')
        args = args[i:i+2]+args
        del args[i+2:i+4]
    options = ClientOptions()
    try:
        options.parseOptions(args)
    except usage.UsageError, u:
        print 'ERROR: %s' % u
        sys.exit(1)
    if options['log']:
        realout = sys.stdout
        log.startLogging(sys.stderr)
        sys.stdout = realout
    else:
        log.discardLogs()
    doConnect(options)
    reactor.run()
#    prof.stop()
#    prof.close()
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_cancelDelayedCall(self):
        """
        Test that when a DelayedCall is cancelled it does not run.
        """
        called = []
        def function():
            called.append(None)
        call = reactor.callLater(0, function)
        call.cancel()

        # Schedule a call in two "iterations" to check to make sure that the
        # above call never ran.
        d = Deferred()
        def check():
            try:
                self.assertEqual(called, [])
            except:
                d.errback()
            else:
                d.callback(None)
        reactor.callLater(0, reactor.callLater, 0, check)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_cancelCalledDelayedCallAsynchronous(self):
        """
        Test that cancelling a DelayedCall after it has run its function
        raises the appropriate exception.
        """
        d = Deferred()
        def check():
            try:
                self.assertRaises(error.AlreadyCalled, call.cancel)
            except:
                d.errback()
            else:
                d.callback(None)
        def later():
            reactor.callLater(0, check)
        call = reactor.callLater(0, later)
        return d
test_internet.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_crash(self):
        """
        reactor.crash should NOT fire shutdown triggers
        """
        events = []
        self.addTrigger(
            "before", "shutdown",
            lambda: events.append(("before", "shutdown")))

        # reactor.crash called from an "after-startup" trigger is too early
        # for the gtkreactor: gtk_mainloop is not yet running. Same is true
        # when called with reactor.callLater(0). Must be >0 seconds in the
        # future to let gtk_mainloop start first.
        reactor.callWhenRunning(
            reactor.callLater, 0, reactor.crash)
        reactor.run()
        self.failIf(events, "reactor.crash invoked shutdown triggers, but it "
                            "isn't supposed to.")

    # XXX Test that reactor.stop() invokes shutdown triggers
twistedtools.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def threaded_reactor():
    """
    Start the Twisted reactor in a separate thread, if not already done.
    Returns the reactor.
    The thread will automatically be destroyed when all the tests are done.
    """
    global _twisted_thread
    try:
        from twisted.internet import reactor
    except ImportError:
        return None, None
    if not _twisted_thread:
        from twisted.python import threadable
        from threading import Thread
        _twisted_thread = Thread(target=lambda: reactor.run( \
                installSignalHandlers=False))
        _twisted_thread.setDaemon(True)
        _twisted_thread.start()
    return reactor, _twisted_thread

# Export global reactor variable, as Twisted does
feed_exp.py 文件源码 项目:news 作者: wsdookadr 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def fetch_all(feeds):
    BATCH_SIZE=5
    batches = []
    for feeds_batch in batch_gen(feeds, BATCH_SIZE):
        sem = DeferredSemaphore(len(feeds_batch))
        batch = []
        for feed_ in feeds_batch:
            batch.append(sem.run(fetch_single, feed_meta=feed_))
        batchDef = gatherResults(batch, consumeErrors=False)
        batchDef.addCallback(store_fetched_data)
        batches.append(batchDef)

    # rendez-vous for all feeds that were fetched
    batchesDef = gatherResults(batches, consumeErrors=False)

    batchesDef.addCallbacks(
            clean_up_and_exit,
            errback=lambda x: None,
            )
    return batchesDef
bottle.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self, app): # pragma: no cover
        from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
        from wsgiref.simple_server import make_server
        import socket

        class FixedHandler(WSGIRequestHandler):
            def address_string(self): # Prevent reverse DNS lookups please.
                return self.client_address[0]
            def log_request(*args, **kw):
                if not self.quiet:
                    return WSGIRequestHandler.log_request(*args, **kw)

        handler_cls = self.options.get('handler_class', FixedHandler)
        server_cls  = self.options.get('server_class', WSGIServer)

        if ':' in self.host: # Fix wsgiref for IPv6 addresses.
            if getattr(server_cls, 'address_family') == socket.AF_INET:
                class server_cls(server_cls):
                    address_family = socket.AF_INET6

        srv = make_server(self.host, self.port, app, server_cls, handler_cls)
        srv.serve_forever()
bottle.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self, handler): # pragma: no cover
        import fapws._evwsgi as evwsgi
        from fapws import base, config
        port = self.port
        if float(config.SERVER_IDENT[-2:]) > 0.4:
            # fapws3 silently changed its API in 0.5
            port = str(port)
        evwsgi.start(self.host, port)
        # fapws3 never releases the GIL. Complain upstream. I tried. No luck.
        if 'BOTTLE_CHILD' in os.environ and not self.quiet:
            _stderr("WARNING: Auto-reloading does not work with Fapws3.\n")
            _stderr("         (Fapws3 breaks python thread support)\n")
        evwsgi.set_base_module(base)
        def app(environ, start_response):
            environ['wsgi.multiprocess'] = False
            return handler(environ, start_response)
        evwsgi.wsgi_cb(('', app))
        evwsgi.run()
bottle.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        exists = os.path.exists
        mtime = lambda path: os.stat(path).st_mtime
        files = dict()

        for module in list(sys.modules.values()):
            path = getattr(module, '__file__', '')
            if path[-4:] in ('.pyo', '.pyc'): path = path[:-1]
            if path and exists(path): files[path] = mtime(path)

        while not self.status:
            if not exists(self.lockfile)\
            or mtime(self.lockfile) < time.time() - self.interval - 5:
                self.status = 'error'
                thread.interrupt_main()
            for path, lmtime in list(files.items()):
                if not exists(path) or mtime(path) > lmtime:
                    self.status = 'reload'
                    thread.interrupt_main()
                    break
            time.sleep(self.interval)
lightningrod.py 文件源码 项目:iotronic-lightning-rod 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start(self):
        LOG.info(" - starting Lightning-rod WAMP server...")
        reactor.run()

        """
        # TEMPORARY ------------------------------------------------------
        from subprocess import call
        LOG.debug("Unmounting...")

        try:
            mountPoint = "/opt/BBB"
            # errorCode = self.libc.umount(mountPoint, None)
            errorCode = call(["umount", "-l", mountPoint])

            LOG.debug("Unmount " + mountPoint + " result: " + str(errorCode))

        except Exception as msg:
            result = "Unmounting error:", msg
            LOG.debug(result)
        # ------------------------------------------------------------------
        """
bottle.py 文件源码 项目:ynm3k 作者: socrateslee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self, app): # pragma: no cover
        from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
        from wsgiref.simple_server import make_server
        import socket

        class FixedHandler(WSGIRequestHandler):
            def address_string(self): # Prevent reverse DNS lookups please.
                return self.client_address[0]
            def log_request(*args, **kw):
                if not self.quiet:
                    return WSGIRequestHandler.log_request(*args, **kw)

        handler_cls = self.options.get('handler_class', FixedHandler)
        server_cls  = self.options.get('server_class', WSGIServer)

        if ':' in self.host: # Fix wsgiref for IPv6 addresses.
            if getattr(server_cls, 'address_family') == socket.AF_INET:
                class server_cls(server_cls):
                    address_family = socket.AF_INET6

        srv = make_server(self.host, self.port, app, server_cls, handler_cls)
        srv.serve_forever()
bottle.py 文件源码 项目:ynm3k 作者: socrateslee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self, handler): # pragma: no cover
        import fapws._evwsgi as evwsgi
        from fapws import base, config
        port = self.port
        if float(config.SERVER_IDENT[-2:]) > 0.4:
            # fapws3 silently changed its API in 0.5
            port = str(port)
        evwsgi.start(self.host, port)
        # fapws3 never releases the GIL. Complain upstream. I tried. No luck.
        if 'BOTTLE_CHILD' in os.environ and not self.quiet:
            _stderr("WARNING: Auto-reloading does not work with Fapws3.\n")
            _stderr("         (Fapws3 breaks python thread support)\n")
        evwsgi.set_base_module(base)
        def app(environ, start_response):
            environ['wsgi.multiprocess'] = False
            return handler(environ, start_response)
        evwsgi.wsgi_cb(('', app))
        evwsgi.run()
bottle.py 文件源码 项目:ynm3k 作者: socrateslee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        exists = os.path.exists
        mtime = lambda path: os.stat(path).st_mtime
        files = dict()

        for module in list(sys.modules.values()):
            path = getattr(module, '__file__', '')
            if path[-4:] in ('.pyo', '.pyc'): path = path[:-1]
            if path and exists(path): files[path] = mtime(path)

        while not self.status:
            if not exists(self.lockfile)\
            or mtime(self.lockfile) < time.time() - self.interval - 5:
                self.status = 'error'
                thread.interrupt_main()
            for path, lmtime in list(files.items()):
                if not exists(path) or mtime(path) > lmtime:
                    self.status = 'reload'
                    thread.interrupt_main()
                    break
            time.sleep(self.interval)


问题


面经


文章

微信
公众号

扫码关注公众号