def getResourceFor(self, request):
"""Get a resource for a request.
This iterates through the resource heirarchy, calling
getChildWithDefault on each resource it finds for a path element,
stopping when it hits an element where isLeaf is true.
"""
log.msg("Request for resource at %s" % request.path)
request.site = self
# Sitepath is used to determine cookie names between distributed
# servers and disconnected sites.
request.sitepath = copy.copy(request.prepath)
child = resource.getChildForRequest(self.resource, request)
log.msg(child)
return child
python类getChildForRequest()的实例源码
def test_notFound(self):
"""
If a request is made which encounters a L{File} before a final segment
which does not correspond to any file in the path the L{File} was
created with, a not found response is sent.
"""
base = FilePath(self.mktemp())
base.makedirs()
file = static.File(base.path)
request = DummyRequest([b'foobar'])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(request.responseCode, 404)
d.addCallback(cbRendered)
return d
def test_securityViolationNotFound(self):
"""
If a request is made which encounters a L{File} before a final segment
which cannot be looked up in the filesystem due to security
considerations, a not found response is sent.
"""
base = FilePath(self.mktemp())
base.makedirs()
file = static.File(base.path)
request = DummyRequest([b'..'])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(request.responseCode, 404)
d.addCallback(cbRendered)
return d
def test_indexNames(self):
"""
If a request is made which encounters a L{File} before a final empty
segment, a file in the L{File} instance's C{indexNames} list which
exists in the path the L{File} was created with is served as the
response to the request.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child("foo.bar").setContent(b"baz")
file = static.File(base.path)
file.indexNames = [b'foo.bar']
request = DummyRequest([b''])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(b''.join(request.written), b'baz')
self.assertEqual(
request.responseHeaders.getRawHeaders(b'content-length')[0],
b'3')
d.addCallback(cbRendered)
return d
def test_ignoredExtensionsIgnored(self):
"""
A request for the I{base} child of a L{File} succeeds with a resource
for the I{base<extension>} file in the path the L{File} was created
with if such a file exists and the L{File} has been configured to
ignore the I{<extension>} extension.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child('foo.bar').setContent(b'baz')
base.child('foo.quux').setContent(b'foobar')
file = static.File(base.path, ignoredExts=(b".bar",))
request = DummyRequest([b"foo"])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(b''.join(request.written), b'baz')
d.addCallback(cbRendered)
return d
def test_directoryWithoutTrailingSlashRedirects(self):
"""
A request for a path which is a directory but does not have a trailing
slash will be redirected to a URL which does have a slash by L{File}.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child('folder').makedirs()
file = static.File(base.path)
request = DummyRequest([b"folder"])
request.uri = b"http://dummy/folder#baz?foo=bar"
child = resource.getChildForRequest(file, request)
self.successResultOf(self._render(child, request))
self.assertEqual(request.responseCode, FOUND)
self.assertEqual(request.responseHeaders.getRawHeaders(b"location"),
[b"http://dummy/folder/#baz?foo=bar"])
def _invalidAuthorizationTest(self, response):
"""
Create a request with the given value as the value of an
I{Authorization} header and perform resource traversal with it,
starting at C{self.wrapper}. Assert that the result is a 401 response
code. Return a L{Deferred} which fires when this is all done.
"""
self.credentialFactories.append(BasicCredentialFactory('example.com'))
request = self.makeRequest([self.childName])
request.requestHeaders.addRawHeader(b'authorization', response)
child = getChildForRequest(self.wrapper, request)
d = request.notifyFinish()
def cbFinished(result):
self.assertEqual(request.responseCode, 401)
d.addCallback(cbFinished)
request.render(child)
return d
def test_unexpectedDecodeError(self):
"""
Any unexpected exception raised by the credential factory's C{decode}
method results in a 500 response code and causes the exception to be
logged.
"""
class UnexpectedException(Exception):
pass
class BadFactory(object):
scheme = b'bad'
def getChallenge(self, client):
return {}
def decode(self, response, request):
raise UnexpectedException()
self.credentialFactories.append(BadFactory())
request = self.makeRequest([self.childName])
request.requestHeaders.addRawHeader(b'authorization', b'Bad abc')
child = getChildForRequest(self.wrapper, request)
request.render(child)
self.assertEqual(request.responseCode, 500)
self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
def test_anonymousAccess(self):
"""
Anonymous requests are allowed if a L{Portal} has an anonymous checker
registered.
"""
unprotectedContents = b"contents of the unprotected child resource"
self.avatars[ANONYMOUS] = Resource()
self.avatars[ANONYMOUS].putChild(
self.childName, Data(unprotectedContents, 'text/plain'))
self.portal.registerChecker(AllowAnonymousAccess())
self.credentialFactories.append(BasicCredentialFactory('example.com'))
request = self.makeRequest([self.childName])
child = getChildForRequest(self.wrapper, request)
d = request.notifyFinish()
def cbFinished(ignored):
self.assertEqual(request.written, [unprotectedContents])
d.addCallback(cbFinished)
request.render(child)
return d
def process(self):
"""Process an incoming request to the Farfetchd server."""
# Get site from channel
self.site = self.channel.site
# Set various default headers
self.setHeader(b"Content-Type", b"application/vnd.api+json")
self.setHeader(b"Server", "Farfetchd v%s" % FARFETCHD_API_VERSION)
self.setHeader(b"Date", http.datetimeToString())
# Resource Identification
self.prepath = []
self.postpath = list(map(http.unquote, self.path[1:].split(b'/')))
log.msg("postpath is %s" % self.postpath)
log.msg("self.resource is %s" % self.resource)
#requested_resource = self.resource.getChildForRequest(self)
requested_resource = resource.getChildForRequest(self.resource, self)
try:
requested_resource = self.site.getResourceFor(self)
#self.render(requested_resource)
log.msg("Requested resource is %s" % requested_resource)
log.msg("Requested resource entities are %s" % requested_resource.listEntities())
if requested_resource:
if requested_resource.responseType:
log.msg("Request will be handled by %r" % requested_resource.__class__.__name__)
self.checkRequestHeaders()
#requested_resource.render(self)
self.render(requested_resource)
else:
self.setResponseCode(http.NOT_FOUND)
self.write(b"No such resource")
except:
self.processingFailed(failure.Failure())
if not self.finished:
self.finish()
def testDeferredResource(self):
r = resource.Resource()
r.isLeaf = 1
s = SDResource(r)
d = DummyRequest(['foo', 'bar', 'baz'])
resource.getChildForRequest(s, d)
self.assertEqual(d.postpath, ['bar', 'baz'])
def testDeferredResource(self):
r = resource.Resource()
r.isLeaf = 1
s = SDResource(r)
d = DummyRequest(['foo', 'bar', 'baz'])
resource.getChildForRequest(s, d)
self.assertEqual(d.postpath, ['bar', 'baz'])
def testDeferredResource(self):
r = resource.Resource()
r.isLeaf = 1
s = SDResource(r)
d = DummyRequest(['foo', 'bar', 'baz'])
resource.getChildForRequest(s, d)
self.assertEqual(d.postpath, ['bar', 'baz'])
def test_emptyChild(self):
"""
The C{''} child of a L{File} which corresponds to a directory in the
filesystem is a L{DirectoryLister}.
"""
base = FilePath(self.mktemp())
base.makedirs()
file = static.File(base.path)
request = DummyRequest([b''])
child = resource.getChildForRequest(file, request)
self.assertIsInstance(child, static.DirectoryLister)
self.assertEqual(child.path, base.path)
def test_processors(self):
"""
If a request is made which encounters a L{File} before a final segment
which names a file with an extension which is in the L{File}'s
C{processors} mapping, the processor associated with that extension is
used to serve the response to the request.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child("foo.bar").setContent(
b"from twisted.web.static import Data\n"
b"resource = Data(b'dynamic world', 'text/plain')\n")
file = static.File(base.path)
file.processors = {b'.bar': script.ResourceScript}
request = DummyRequest([b"foo.bar"])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(b''.join(request.written), b'dynamic world')
self.assertEqual(
request.responseHeaders.getRawHeaders(b'content-length')[0],
b'13')
d.addCallback(cbRendered)
return d
def test_childrenNotFound(self):
"""
Any child resource of L{static.DirectoryLister} renders an HTTP
I{NOT FOUND} response code.
"""
path = FilePath(self.mktemp())
path.makedirs()
lister = static.DirectoryLister(path.path)
request = self._request(b'')
child = resource.getChildForRequest(lister, request)
result = _render(child, request)
def cbRendered(ignored):
self.assertEqual(request.responseCode, http.NOT_FOUND)
result.addCallback(cbRendered)
return result
def _authorizedBasicLogin(self, request):
"""
Add an I{basic authorization} header to the given request and then
dispatch it, starting from C{self.wrapper} and returning the resulting
L{IResource}.
"""
authorization = b64encode(self.username + b':' + self.password)
request.requestHeaders.addRawHeader(b'authorization',
b'Basic ' + authorization)
return getChildForRequest(self.wrapper, request)
def test_getChallengeCalledWithRequest(self):
"""
When L{HTTPAuthSessionWrapper} finds an L{ICredentialFactory} to issue
a challenge, it calls the C{getChallenge} method with the request as an
argument.
"""
@implementer(ICredentialFactory)
class DumbCredentialFactory(object):
scheme = b'dumb'
def __init__(self):
self.requests = []
def getChallenge(self, request):
self.requests.append(request)
return {}
factory = DumbCredentialFactory()
self.credentialFactories.append(factory)
request = self.makeRequest([self.childName])
child = getChildForRequest(self.wrapper, request)
d = request.notifyFinish()
def cbFinished(ignored):
self.assertEqual(factory.requests, [request])
d.addCallback(cbFinished)
request.render(child)
return d
def test_decodeRaises(self):
"""
Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
results in an L{UnauthorizedResource} when the request has a I{Basic
Authorization} header which cannot be decoded using base64.
"""
self.credentialFactories.append(BasicCredentialFactory('example.com'))
request = self.makeRequest([self.childName])
request.requestHeaders.addRawHeader(b'authorization', b'Basic decode should fail')
child = getChildForRequest(self.wrapper, request)
self.assertIsInstance(child, UnauthorizedResource)
def getResourceFor(self, request):
"""
Get a resource for a request.
This iterates through the resource hierarchy, calling
getChildWithDefault on each resource it finds for a path element,
stopping when it hits an element where isLeaf is true.
"""
request.site = self
# Sitepath is used to determine cookie names between distributed
# servers and disconnected sites.
request.sitepath = copy.copy(request.prepath)
return resource.getChildForRequest(self.resource, request)
# IProtocolNegotiationFactory
def test_greeting_trailing_slash(self):
'''The Greeting resource must be accessible as the direct child of the
TxDarn resource (i.e., with a trailing slash).
'''
# a trailing slash -- empty string in postpath
self.request.postpath = [b'']
resrc = getChildForRequest(self.txdarn, self.request)
self.assertIs(resrc, self.txdarn.greeting)
def test_info(self):
'''The Info resource must be accessible as the "info" child of the
TxDarn resource.
'''
self.request.postpath = [b'info']
resrc = getChildForRequest(self.txdarn, self.request)
self.assertIs(resrc, self.txdarn.info)