python类WSGIResource()的实例源码

webserver.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def start(self):
        """Start the Web Server """
        self.site = Site(WSGIResource(reactor, reactor.getThreadPool(), self.app))
        self.port = reactor.listenTCP(self.server.config.webport, self.site)
main_plugin.py 文件源码 项目:crondeamon 作者: zhoukunpeng504 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def makeService(self, options):
        config=valid_config()
        s=MultiService()
        from  crondeamon.slave import  service as subrpc
        serverfactory = server.Site(subrpc.MainRpc())
        slave_service=TCPServer(int(config["slaveport"]),serverfactory,interface=config["host"])
        slave_service.setServiceParent(s)
        os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cap.settings")
        from django.core.handlers.wsgi import WSGIHandler
        application = WSGIHandler()
        resource = WSGIResource(reactor, reactor.getThreadPool(), application)
        ui_service=TCPServer(int(config["uiport"]),server.Site(resource),interface=config["host"])
        ui_service.setServiceParent(s)
        return  s
anyserver.py 文件源码 项目:web3py 作者: web2py 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def twisted(app, address, **options):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
        reactor.listenTCP(address[1], factory, interface=address[0])
        reactor.run()
bottle.py 文件源码 项目:Pardus-Bulut 作者: ferhatacikalin 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        reactor.run()
bottle.py 文件源码 项目:Eagle 作者: magerx 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        reactor.run()
bottle.py 文件源码 项目:base1k 作者: gumblex 项目源码 文件源码 阅读 13 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        if not reactor.running:
            reactor.run()
anyserver.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def twisted(app, address, **options):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
        reactor.listenTCP(address[1], factory, interface=address[0])
        reactor.run()
bottle.py 文件源码 项目:Helix 作者: 3lackrush 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        if not reactor.running:
            reactor.run()
bottle.py 文件源码 项目:installations-sportives-pdl 作者: sebprunier 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        reactor.run()
bottle.py 文件源码 项目:bigbottle 作者: opendiploma 项目源码 文件源码 阅读 12 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        if not reactor.running:
            reactor.run()
bottle.py 文件源码 项目:autoscan 作者: b01u 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        reactor.run()
bottle.py 文件源码 项目:web-nlp-interface 作者: kanjirz50 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        if not reactor.running:
            reactor.run()
tap.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def opt_wsgi(self, name):
        """
        The FQPN of a WSGI application object to serve as the root resource of
        the webserver.
        """
        try:
            application = reflect.namedAny(name)
        except (AttributeError, ValueError):
            raise usage.UsageError("No such WSGI application: %r" % (name,))
        pool = threadpool.ThreadPool()
        reactor.callWhenRunning(pool.start)
        reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
        self['root'] = wsgi.WSGIResource(reactor, pool, application)
test_wsgi.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        """
        Create a L{WSGIResource} with synchronous threading objects and a no-op
        application object.  This is useful for testing certain things about
        the resource implementation which are unrelated to WSGI.
        """
        self.resource = WSGIResource(
            SynchronousReactorThreads(), SynchronousThreadPool(),
            lambda environ, startResponse: None)
test_wsgi.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_interfaces(self):
        """
        L{WSGIResource} implements L{IResource} and stops resource traversal.
        """
        verifyObject(IResource, self.resource)
        self.assertTrue(self.resource.isLeaf)
test_wsgi.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_unsupported(self):
        """
        A L{WSGIResource} cannot have L{IResource} children.  Its
        C{getChildWithDefault} and C{putChild} methods raise L{RuntimeError}.
        """
        self.assertRaises(
            RuntimeError,
            self.resource.getChildWithDefault,
            b"foo", Request(DummyChannel(), False))
        self.assertRaises(
            RuntimeError,
            self.resource.putChild,
            b"foo", Resource())
test_wsgi.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_environIsDict(self):
        """
        L{WSGIResource} calls the application object with an C{environ}
        parameter which is exactly of type C{dict}.
        """
        d = self.render('GET', '1.1', [], [''])
        def cbRendered(result):
            environ, startResponse = result
            self.assertIdentical(type(environ), dict)
            # Environment keys are always native strings.
            for name in environ:
                self.assertIsInstance(name, str)
        d.addCallback(cbRendered)
        return d
tracker.py 文件源码 项目:revocation-tracker 作者: alex 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(port, db_uri, hsts):
    cert_db = CertificateDatabase(db_uri)
    crtsh_checker = CrtshChecker()
    app = raw_app = WSGIApplication(cert_db, crtsh_checker)
    if hsts:
        app = wsgi_sslify.sslify(app, subdomains=True)

    def build_service(reactor):
        multi = MultiService()
        StreamServerEndpointService(
            TCP4ServerEndpoint(reactor, port),
            server.Site(
                wsgi.WSGIResource(reactor, reactor.getThreadPool(), app),
            )
        ).setServiceParent(multi)

        logger = Logger()
        TimerService(
            # Run every 10 minutes
            10 * 60,
            lambda: deferToThread(
                check_for_revocation, cert_db, crtsh_checker
            ).addErrback(
                lambda f: logger.failure("Error checking for revocation", f)
            )
        ).setServiceParent(multi)

        TimerService(
            60 * 60,
            lambda: deferToThread(
                raw_app._update_lint_summaries
            ).addErrback(
                lambda f: logger.failure("Error updating cablint summaries", f)
            )
        ).setServiceParent(multi)
        return multi

    run_service(build_service)
bottle.py 文件源码 项目:props 作者: gabrielStanovsky 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        reactor.run()
bottle.py 文件源码 项目:props 作者: gabrielStanovsky 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self, handler):
        from twisted.web import server, wsgi
        from twisted.python.threadpool import ThreadPool
        from twisted.internet import reactor
        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        reactor.run()


问题


面经


文章

微信
公众号

扫码关注公众号