def getChild(self, name, request):
"""
Dispatch a specific incoming request to an appropriate resource
"""
# First try: static resource
static = self.static.handle_static(request)
if static:
return static
# If that doesn't serve the request, try the plugin dynamic path
if request.path in self.path_map:
print 'using plugin %s for %s' % (self.path_map[request.path], request.path)
cfg = self.cfg.copy()
cfg['tab_map'] = self.tab_map
for arg in request.args:
if arg not in cfg:
cfg[arg] = request.args[arg][0]
# Augment the request with our own session data
request.sdata = session.get_data(request)
return self.path_map[request.path](cfg)
# Nothing else to try
print 'Failed to match path', request.path, 'to any plugins', self.path_map
return NoResource()
python类NoResource()的实例源码
def getChild(self, name, request):
out = Resource.getChild(self, name, request)
LOGGER.warning("%s - %s", request.path, out.__class__.__name__)
if isinstance(out, NoResource):
# this matches the pre-existing behavior
# of accepting any post as a message forward
components = request.path.split('/')
while components and components[0] == '':
components.pop(0)
LOGGER.warning("%s - %d", components, len(components))
if len(components) > 0:
out = ForwardPage(self.validator, components[0])
return out
def getChildWithDefault(self, name, request):
"""
This resource cannot have children, hence this will always fail.
"""
return NoResource("No such child resource.")
def getChild(self, path, request):
fn = os.path.join(self.path, path)
if os.path.isdir(fn):
return ResourceScriptDirectory(fn, self.registry)
if os.path.exists(fn):
return ResourceScript(fn, self.registry)
return resource.NoResource()
def render(self, request):
return resource.NoResource().render(request)
def render(self, request):
"""
Render me to a web client.
Load my file, execute it in a special namespace (with 'request' and
'__file__' global vars) and finish the request. Output to the web-page
will NOT be handled with print - standard output goes to the log - but
with request.write.
"""
request.setHeader(b"x-powered-by", networkString("Twisted/%s" % copyright.version))
namespace = {'request': request,
'__file__': _coerceToFilesystemEncoding("", self.filename),
'registry': self.registry}
try:
execfile(self.filename, namespace, namespace)
except IOError as e:
if e.errno == 2: #file not found
request.setResponseCode(http.NOT_FOUND)
request.write(resource.NoResource("File not found.").render(request))
except:
io = NativeStringIO()
traceback.print_exc(file=io)
output = util._PRE(io.getvalue())
if _PY3:
output = output.encode("utf8")
request.write(output)
request.finish()
return server.NOT_DONE_YET
def _getResourceForRequest(self, request):
"""(Internal) Get the appropriate resource for the given host.
"""
hostHeader = request.getHeader(b'host')
if hostHeader == None:
return self.default or resource.NoResource()
else:
host = hostHeader.lower().split(b':', 1)[0]
return (self.hosts.get(host, self.default)
or resource.NoResource("host %s not in vhost map" % repr(host)))
def test_getChild(self):
"""
L{_HostResource.getChild} returns the proper I{Resource} for the vhost
embedded in the URL. Verify that returning the proper I{Resource}
required changing the I{Host} in the header.
"""
bazroot = Data(b'root data', "")
bazuri = Data(b'uri data', "")
baztest = Data(b'test data', "")
bazuri.putChild(b'test', baztest)
bazroot.putChild(b'uri', bazuri)
hr = _HostResource()
root = NameVirtualHost()
root.default = Data(b'default data', "")
root.addHost(b'baz.com', bazroot)
request = DummyRequest([b'uri', b'test'])
request.prepath = [b'bar', b'http', b'baz.com']
request.site = Site(root)
request.isSecure = lambda: False
request.host = b''
step = hr.getChild(b'baz.com', request) # Consumes rest of path
self.assertIsInstance(step, Data)
request = DummyRequest([b'uri', b'test'])
step = root.getChild(b'uri', request)
self.assertIsInstance(step, NoResource)
def getChild(self, name, request):
if name == '':
return self
td = '.twistd'
if name[-len(td):] == td:
username = name[:-len(td)]
sub = 1
else:
username = name
sub = 0
try:
pw_name, pw_passwd, pw_uid, pw_gid, pw_gecos, pw_dir, pw_shell \
= self._pwd.getpwnam(username)
except KeyError:
return resource.NoResource()
if sub:
twistdsock = os.path.join(pw_dir, self.userSocketName)
rs = ResourceSubscription('unix',twistdsock)
self.putChild(name, rs)
return rs
else:
path = os.path.join(pw_dir, self.userDirName)
if not os.path.exists(path):
return resource.NoResource()
return static.File(path)
def getChild(self, path, request):
fnp = self.child(path)
if not fnp.exists():
return static.File.childNotFound
elif fnp.isdir():
return CGIDirectory(fnp.path)
else:
return CGIScript(fnp.path)
return resource.NoResource()
def render(self, request):
notFound = resource.NoResource(
"CGI directories do not support directory listing.")
return notFound.render(request)
def test_getChild_unknown_transport(self):
'''A request with a non-iframe path and an unknown transport results
in a 404.
'''
resrc = self.txdarn.getChild(b'unknown-transport', self.request)
self.assertIsInstance(resrc, NoResource)
def getChild(self, path, request):
if self.MATCH_IFRAME.match(path):
return self.iframe
try:
request.postpath.insert(0, path)
return self.transports[request.postpath[-1]]
except KeyError:
return resource.NoResource()
def getChild(self, name, request):
if name == b'rpc':
return self.rpc
else:
return NoResource()
def __init__(self, resource=None, *args, **kwargs):
# type: (Resource, *Any, **Any) -> None
"""
:param resource: Root resource for Twisted's standard Site implementation. Pass a resource if you want to fall
back to Twisted's default resource lookup mechanism in case no route matches. If None is passed, defaults to a
NoResource() instance.
"""
resource = resource or NoResource()
Site.__init__(self, resource, *args, **kwargs)
self.routes = {} # type: Dict[Pattern, Resource]
def render(self, request):
if 'dir' in request.args:
dir = unquote(request.args['dir'][0])
elif 'root' in request.args:
dir = unquote(request.args['root'][0])
else:
dir = ''
if 'file' in request.args:
filename = unquote(request.args["file"][0])
path = dir + filename
#dirty backwards compatibility hack
if not os_path.exists(path):
path = resolveFilename(SCOPE_HDD, filename)
print "[WebChilds.FileStreamer] path is %s" %path
if os_path.exists(path):
basename = filename.decode('utf-8', 'ignore').encode('ascii', 'ignore')
if '/' in basename:
basename = basename.split('/')[-1]
request.setHeader("content-disposition", "attachment;filename=\"%s\"" % (basename))
file = static.File(path, defaultType = "application/octet-stream")
return file.render_GET(request)
else:
return resource.NoResource(message="file '%s' was not found" %(dir + filename)).render(request)
else:
return resource.NoResource(message="no file given with file={filename}").render(request)
return server.NOT_DONE_YET
def test_route_site():
root = Resource()
root.putChild(b"default", HotelPage())
path_site = retwist.RouteSite(root)
# Add a path with named parameters (get stored as dictionary)
path_site.addRoute(r"/hotels/(?P<hotel_id>.*)/info", HotelPage())
# Test that we get the correct request object
request = MyDummyRequest([b"hotels", b"1234", b"info"])
resource = path_site.getResourceFor(request)
assert isinstance(resource, HotelPage)
assert request.path_args == {
"hotel_id": "1234"
}
# Test that request rendering receives tha arguments correctly
yield _render(resource, request)
response_str = b"".join(request.written)
assert response_str == b"1234"
# ... now let's add a path with unnamed parameters, which are passed as a tuple
path_site.addRoute(r"/restaurants/(.*)/info", RestaurantPage())
request = MyDummyRequest([b"restaurants", b"5678", b"info"])
resource = path_site.getResourceFor(request)
assert isinstance(resource, RestaurantPage)
assert request.path_args == ("5678",)
# Again, test that rendering works as expected
yield _render(resource, request)
response_str = b"".join(request.written)
assert response_str == b"5678"
# Test the fallback to regular Twisted path resolution
request = MyDummyRequest([b"default"])
resource = path_site.getResourceFor(request)
assert isinstance(resource, HotelPage)
# Test 404
request = MyDummyRequest([b"some", b"nonexistent", b"path"])
resource = path_site.getResourceFor(request)
assert isinstance(resource, NoResource)