def start(self):
"""Start the Web Server """
self.site = Site(WSGIResource(reactor, reactor.getThreadPool(), self.app))
self.port = reactor.listenTCP(self.server.config.webport, self.site)
python类WSGIResource()的实例源码
def makeService(self, options):
config=valid_config()
s=MultiService()
from crondeamon.slave import service as subrpc
serverfactory = server.Site(subrpc.MainRpc())
slave_service=TCPServer(int(config["slaveport"]),serverfactory,interface=config["host"])
slave_service.setServiceParent(s)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cap.settings")
from django.core.handlers.wsgi import WSGIHandler
application = WSGIHandler()
resource = WSGIResource(reactor, reactor.getThreadPool(), application)
ui_service=TCPServer(int(config["uiport"]),server.Site(resource),interface=config["host"])
ui_service.setServiceParent(s)
return s
def twisted(app, address, **options):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
reactor.listenTCP(address[1], factory, interface=address[0])
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def twisted(app, address, **options):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
reactor.listenTCP(address[1], factory, interface=address[0])
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def opt_wsgi(self, name):
"""
The FQPN of a WSGI application object to serve as the root resource of
the webserver.
"""
try:
application = reflect.namedAny(name)
except (AttributeError, ValueError):
raise usage.UsageError("No such WSGI application: %r" % (name,))
pool = threadpool.ThreadPool()
reactor.callWhenRunning(pool.start)
reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
self['root'] = wsgi.WSGIResource(reactor, pool, application)
def setUp(self):
"""
Create a L{WSGIResource} with synchronous threading objects and a no-op
application object. This is useful for testing certain things about
the resource implementation which are unrelated to WSGI.
"""
self.resource = WSGIResource(
SynchronousReactorThreads(), SynchronousThreadPool(),
lambda environ, startResponse: None)
def test_interfaces(self):
"""
L{WSGIResource} implements L{IResource} and stops resource traversal.
"""
verifyObject(IResource, self.resource)
self.assertTrue(self.resource.isLeaf)
def test_unsupported(self):
"""
A L{WSGIResource} cannot have L{IResource} children. Its
C{getChildWithDefault} and C{putChild} methods raise L{RuntimeError}.
"""
self.assertRaises(
RuntimeError,
self.resource.getChildWithDefault,
b"foo", Request(DummyChannel(), False))
self.assertRaises(
RuntimeError,
self.resource.putChild,
b"foo", Resource())
def test_environIsDict(self):
"""
L{WSGIResource} calls the application object with an C{environ}
parameter which is exactly of type C{dict}.
"""
d = self.render('GET', '1.1', [], [''])
def cbRendered(result):
environ, startResponse = result
self.assertIdentical(type(environ), dict)
# Environment keys are always native strings.
for name in environ:
self.assertIsInstance(name, str)
d.addCallback(cbRendered)
return d
def run(port, db_uri, hsts):
cert_db = CertificateDatabase(db_uri)
crtsh_checker = CrtshChecker()
app = raw_app = WSGIApplication(cert_db, crtsh_checker)
if hsts:
app = wsgi_sslify.sslify(app, subdomains=True)
def build_service(reactor):
multi = MultiService()
StreamServerEndpointService(
TCP4ServerEndpoint(reactor, port),
server.Site(
wsgi.WSGIResource(reactor, reactor.getThreadPool(), app),
)
).setServiceParent(multi)
logger = Logger()
TimerService(
# Run every 10 minutes
10 * 60,
lambda: deferToThread(
check_for_revocation, cert_db, crtsh_checker
).addErrback(
lambda f: logger.failure("Error checking for revocation", f)
)
).setServiceParent(multi)
TimerService(
60 * 60,
lambda: deferToThread(
raw_app._update_lint_summaries
).addErrback(
lambda f: logger.failure("Error updating cablint summaries", f)
)
).setServiceParent(multi)
return multi
run_service(build_service)
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()