def ResourceScript(path, registry):
"""
I am a normal py file which must define a 'resource' global, which should
be an instance of (a subclass of) web.resource.Resource; it will be
renderred.
"""
cs = CacheScanner(path, registry)
glob = {'__file__': _coerceToFilesystemEncoding("", path),
'resource': noRsrc,
'registry': registry,
'cache': cs.cache,
'recache': cs.recache}
try:
execfile(path, glob, glob)
except AlreadyCached as ac:
return ac.args[0]
rsrc = glob['resource']
if cs.doCache and rsrc is not noRsrc:
registry.cachePath(path, rsrc)
return rsrc
python类Resource()的实例源码
def test_sessionUIDGeneration(self):
"""
L{site.getSession} generates L{Session} objects with distinct UIDs from
a secure source of entropy.
"""
site = server.Site(resource.Resource())
# Ensure that we _would_ use the unpredictable random source if the
# test didn't stub it.
self.assertIdentical(site._entropy, os.urandom)
def predictableEntropy(n):
predictableEntropy.x += 1
return (unichr(predictableEntropy.x) * n).encode("charmap")
predictableEntropy.x = 0
self.patch(site, "_entropy", predictableEntropy)
a = self.getAutoExpiringSession(site)
b = self.getAutoExpiringSession(site)
self.assertEqual(a.uid, b"01" * 0x20)
self.assertEqual(b.uid, b"02" * 0x20)
# This functionality is silly (the value is no longer used in session
# generation), but 'counter' was a public attribute since time
# immemorial so we should make sure if anyone was using it to get site
# metrics or something it keeps working.
self.assertEqual(site.counter, 2)
def test_processingFailedNoTraceback(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{False} does not write out the failure, but give a generic error
message.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = False
fail = failure.Failure(Exception("Oh no!"))
request.processingFailed(fail)
self.assertNotIn(b"Oh no!", request.transport.written.getvalue())
self.assertIn(
b"Processing Failed", request.transport.written.getvalue()
)
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTraceback(self):
"""
L{Request.processingFailed} when the site has C{displayTracebacks} set
to C{True} writes out the failure.
"""
d = DummyChannel()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.site.displayTracebacks = True
fail = failure.Failure(Exception("Oh no!"))
request.processingFailed(fail)
self.assertIn(b"Oh no!", request.transport.written.getvalue())
# Since we didn't "handle" the exception, flush it to prevent a test
# failure
self.assertEqual(1, len(self.flushLoggedErrors()))
def test_sessionDifferentFromSecureSession(self):
"""
L{Request.session} and L{Request.secure_session} should be two separate
sessions with unique ids and different cookies.
"""
d = DummyChannel()
d.transport = DummyChannel.SSL()
request = server.Request(d, 1)
request.site = server.Site(resource.Resource())
request.sitepath = []
secureSession = request.getSession()
self.assertIsNotNone(secureSession)
self.addCleanup(secureSession.expire)
self.assertEqual(request.cookies[0].split(b"=")[0],
b"TWISTED_SECURE_SESSION")
session = request.getSession(forceNotSecure=True)
self.assertIsNotNone(session)
self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION")
self.addCleanup(session.expire)
self.assertNotEqual(session.uid, secureSession.uid)
def test_sessionAttribute(self):
"""
On a L{Request}, the C{session} attribute retrieves the associated
L{Session} only if it has been initialized. If the request is secure,
it retrieves the secure session.
"""
site = server.Site(resource.Resource())
d = DummyChannel()
d.transport = DummyChannel.SSL()
request = server.Request(d, 1)
request.site = site
request.sitepath = []
self.assertIs(request.session, None)
insecureSession = request.getSession(forceNotSecure=True)
self.addCleanup(insecureSession.expire)
self.assertIs(request.session, None)
secureSession = request.getSession()
self.addCleanup(secureSession.expire)
self.assertIsNot(secureSession, None)
self.assertIsNot(secureSession, insecureSession)
self.assertIs(request.session, secureSession)
def test_retrieveNonExistentSession(self):
"""
L{Request.getSession} generates a new session if the relevant cookie is
set in the incoming request.
"""
site = server.Site(resource.Resource())
d = DummyChannel()
request = server.Request(d, 1)
request.site = site
request.sitepath = []
request.received_cookies[b'TWISTED_SESSION'] = b"does-not-exist"
session = request.getSession()
self.assertIsNotNone(session)
self.addCleanup(session.expire)
self.assertTrue(request.cookies[0].startswith(b'TWISTED_SESSION='))
# It should be a new session ID.
self.assertNotIn(b"does-not-exist", request.cookies[0])
def test_getChildWithDefaultAuthorized(self):
"""
Resource traversal which encounters an L{HTTPAuthSessionWrapper}
results in an L{IResource} which renders the L{IResource} avatar
retrieved from the portal when the request has a valid I{Authorization}
header.
"""
self.credentialFactories.append(BasicCredentialFactory('example.com'))
request = self.makeRequest([self.childName])
child = self._authorizedBasicLogin(request)
d = request.notifyFinish()
def cbFinished(ignored):
self.assertEqual(request.written, [self.childContent])
d.addCallback(cbFinished)
request.render(child)
return d
def _logoutTest(self):
"""
Issue a request for an authentication-protected resource using valid
credentials and then return the C{DummyRequest} instance which was
used.
This is a helper for tests about the behavior of the logout
callback.
"""
self.credentialFactories.append(BasicCredentialFactory('example.com'))
class SlowerResource(Resource):
def render(self, request):
return NOT_DONE_YET
self.avatar.putChild(self.childName, SlowerResource())
request = self.makeRequest([self.childName])
child = self._authorizedBasicLogin(request)
request.render(child)
self.assertEqual(self.realm.loggedOut, 0)
return request
def test_anonymousAccess(self):
"""
Anonymous requests are allowed if a L{Portal} has an anonymous checker
registered.
"""
unprotectedContents = b"contents of the unprotected child resource"
self.avatars[ANONYMOUS] = Resource()
self.avatars[ANONYMOUS].putChild(
self.childName, Data(unprotectedContents, 'text/plain'))
self.portal.registerChecker(AllowAnonymousAccess())
self.credentialFactories.append(BasicCredentialFactory('example.com'))
request = self.makeRequest([self.childName])
child = getChildForRequest(self.wrapper, request)
d = request.notifyFinish()
def cbFinished(ignored):
self.assertEqual(request.written, [unprotectedContents])
d.addCallback(cbFinished)
request.render(child)
return d
def test_render(self):
"""
L{ResourceScriptDirectory.getChild} returns a resource which renders a
response with the HTTP 200 status code and the content of the rpy's
C{request} global.
"""
tmp = FilePath(self.mktemp())
tmp.makedirs()
tmp.child("test.rpy").setContent(b"""
from twisted.web.resource import Resource
class TestResource(Resource):
isLeaf = True
def render_GET(self, request):
return b'ok'
resource = TestResource()""")
resource = ResourceScriptDirectory(tmp._asBytesPath())
request = DummyRequest([b''])
child = resource.getChild(b"test.rpy", request)
d = _render(child, request)
def cbRendered(ignored):
self.assertEqual(b"".join(request.written), b"ok")
d.addCallback(cbRendered)
return d
def testDistrib(self):
# site1 is the publisher
r1 = resource.Resource()
r1.putChild("there", static.Data("root", "text/plain"))
site1 = server.Site(r1)
self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
self.port1 = reactor.listenTCP(0, self.f1)
self.sub = distrib.ResourceSubscription("127.0.0.1",
self.port1.getHost().port)
r2 = resource.Resource()
r2.putChild("here", self.sub)
f2 = MySite(r2)
self.port2 = reactor.listenTCP(0, f2)
agent = client.Agent(reactor)
d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
(self.port2.getHost().port,))
d.addCallback(client.readBody)
d.addCallback(self.assertEqual, 'root')
return d
def test_requestHeaders(self):
"""
The request headers are available on the request object passed to a
distributed resource's C{render} method.
"""
requestHeaders = {}
class ReportRequestHeaders(resource.Resource):
def render(self, request):
requestHeaders.update(dict(
request.requestHeaders.getAllRawHeaders()))
return ""
request = self._requestTest(
ReportRequestHeaders(), headers=Headers({'foo': ['bar']}))
def cbRequested(result):
self.assertEqual(requestHeaders['Foo'], ['bar'])
request.addCallback(cbRequested)
return request
def test_requestResponseCode(self):
"""
The response code can be set by the request object passed to a
distributed resource's C{render} method.
"""
class SetResponseCode(resource.Resource):
def render(self, request):
request.setResponseCode(200)
return ""
request = self._requestAgentTest(SetResponseCode())
def cbRequested(result):
self.assertEqual(result[0].data, "")
self.assertEqual(result[1].code, 200)
self.assertEqual(result[1].phrase, "OK")
request.addCallback(cbRequested)
return request
def test_requestResponseCodeMessage(self):
"""
The response code and message can be set by the request object passed to
a distributed resource's C{render} method.
"""
class SetResponseCode(resource.Resource):
def render(self, request):
request.setResponseCode(200, "some-message")
return ""
request = self._requestAgentTest(SetResponseCode())
def cbRequested(result):
self.assertEqual(result[0].data, "")
self.assertEqual(result[1].code, 200)
self.assertEqual(result[1].phrase, "some-message")
request.addCallback(cbRequested)
return request
def alias(aliasPath, sourcePath):
"""
I am not a very good aliaser. But I'm the best I can be. If I'm
aliasing to a Resource that generates links, and it uses any parts
of request.prepath to do so, the links will not be relative to the
aliased path, but rather to the aliased-to path. That I can't
alias static.File directory listings that nicely. However, I can
still be useful, as many resources will play nice.
"""
sourcePath = sourcePath.split('/')
aliasPath = aliasPath.split('/')
def rewriter(request):
if request.postpath[:len(aliasPath)] == aliasPath:
after = request.postpath[len(aliasPath):]
request.postpath = sourcePath + after
request.path = '/'+'/'.join(request.prepath+request.postpath)
return rewriter
def test_URLPath_traversedResource(self):
app = self.app
request = requestMock(b'/resource/foo')
request_url = [None]
class URLPathResource(Resource):
def render(self, request):
request_url[0] = request.URLPath()
def getChild(self, request, segment):
return self
@app.route("/resource/", branch=True)
def root(request):
return URLPathResource()
d = _render(self.kr, request)
self.assertFired(d)
self.assertEqual(str(request_url[0]),
'http://localhost:8080/resource/foo')
def __init__(self,
channel_layer,
root_path='',
http_timeout=120,
websocket_timeout=None,
ping_interval=20,
ping_timeout=30,
ws_protocols=None,
start_scheduler=True,
use_proxy_headers=False,
use_proxy_proto_header=False,
use_x_sendfile=False):
self.manager = ChannelLayerManager(channel_layer, start_scheduler=True)
self.root_path = root_path
self.http_timeout = http_timeout
self.websocket_timeout = websocket_timeout or getattr(channel_layer, "group_expiry", 86400)
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.ws_protocols = ws_protocols
self.use_proxy_headers = use_proxy_headers
self.use_proxy_proto_header = use_proxy_proto_header
self.use_x_sendfile = use_x_sendfile
resource.Resource.__init__(self)
def _render(resource, request):
# type: (Resource, Union[DummyRequest, Request]) -> Any
"""
Simulate rendering of a Twisted resource.
:param resource: Twisted Resource with render() method.
:param request: Request (or mock object).
:return: Deferred
"""
result = resource.render(request)
if isinstance(result, bytes):
request.write(result)
request.finish()
return succeed(None)
elif result == NOT_DONE_YET:
if request.finished:
return succeed(None)
else:
return request.notifyFinish()
else:
raise ValueError("Unexpected return value: %r" % (result,))
def getResourceFor(self, request):
# type: (Request) -> Resource
"""
Check if a route matches this request. Fall back to Twisted default lookup behavior otherwise.
:param request: Twisted request instance
:return: Resource to handle this request
"""
request.site = self
for route_re, resource in self.routes.items():
path = request.path.decode()
match = route_re.match(path)
if match:
request.path_args = match.groupdict() or match.groups()
return resource
return Site.getResourceFor(self, request)