def opt_path(self, path):
"""<path> is either a specific file or a directory to
be set as the root of the web server. Use this if you
have a directory full of HTML, cgi, php3, epy, or rpy files or
any other files that you want to be served up raw.
"""
self['root'] = static.File(os.path.abspath(path))
self['root'].processors = {
'.cgi': twcgi.CGIScript,
'.php3': twcgi.PHP3Script,
'.php': twcgi.PHPScript,
'.epy': script.PythonScript,
'.rpy': script.ResourceScript,
'.trp': trp.ResourceUnpickler,
}
python类File()的实例源码
def setUp(self):
name = str(id(self)) + "_webclient"
if not os.path.exists(name):
os.mkdir(name)
f = open(os.path.join(name, "file"), "wb")
f.write("0123456789")
f.close()
r = static.File(name)
r.putChild("redirect", util.Redirect("/file"))
r.putChild("wait", LongTimeTakingResource())
r.putChild("error", ErrorResource())
r.putChild("nolength", NoLengthResource())
r.putChild("host", HostHeaderResource())
r.putChild("payload", PayloadResource())
r.putChild("broken", BrokenDownloadResource())
site = server.Site(r, timeout=None)
self.port = self._listen(site)
self.portno = self.port.getHost().port
def __init__(self, app, config):
resource.Resource.__init__(self)
self.app = app
self.IApp = IServiceCollection(app, app)
self.debug = config.getboolean('debug', False)
self.nodename = config.get('node_name', socket.gethostname())
static_serve_path = config.get('static_serve_path', 'files')
storage_path = config.get('storage_path')
self.putChild('', Home(self))
self.putChild('jobs', Jobs(self))
self.putChild(static_serve_path, File(storage_path))
services = config.items('services', ())
for servName, servClsName in services:
servCls = load_object(servClsName)
self.putChild(servName, servCls(self))
def __init__(self, validator, static_dir=None):
Resource.__init__(self)
self.validator = validator
if static_dir is not None and os.path.exists(static_dir):
for f in os.listdir(static_dir):
self.putChild(f, File(os.path.join(static_dir, f)))
self.putChild('block', BlockPage(validator))
self.putChild('statistics', StatisticsPage(validator))
self.putChild('store', StorePage(validator))
self.putChild('status', StatusPage(validator))
self.putChild('transaction', TransactionPage(validator))
self.putChild('forward', ForwardPage(validator))
self.putChild('command', CommandPage(validator))
validator.web_thread_pool.start()
def at_webserver_root_creation(web_root):
"""
This is called as the web server has finished building its default
path tree. At this point, the media/ and static/ URIs have already
been added to the web root.
Args:
web_root (twisted.web.resource.Resource): The root
resource of the URI tree. Use .putChild() to
add new subdomains to the tree.
Returns:
web_root (twisted.web.resource.Resource): The potentially
modified root structure.
Example:
from twisted.web import static
my_page = static.File("web/mypage/")
my_page.indexNames = ["index.html"]
web_root.putChild("mypage", my_page)
"""
return web_root
def opt_path(self, path):
"""<path> is either a specific file or a directory to
be set as the root of the web server. Use this if you
have a directory full of HTML, cgi, php3, epy, or rpy files or
any other files that you want to be served up raw.
"""
self['root'] = static.File(os.path.abspath(path))
self['root'].processors = {
'.cgi': twcgi.CGIScript,
'.php3': twcgi.PHP3Script,
'.php': twcgi.PHPScript,
'.epy': script.PythonScript,
'.rpy': script.ResourceScript,
'.trp': trp.ResourceUnpickler,
}
def setUp(self):
name = str(id(self)) + "_webclient"
if not os.path.exists(name):
os.mkdir(name)
f = open(os.path.join(name, "file"), "wb")
f.write("0123456789")
f.close()
r = static.File(name)
r.putChild("redirect", util.Redirect("/file"))
r.putChild("wait", LongTimeTakingResource())
r.putChild("error", ErrorResource())
r.putChild("nolength", NoLengthResource())
r.putChild("host", HostHeaderResource())
r.putChild("payload", PayloadResource())
r.putChild("broken", BrokenDownloadResource())
site = server.Site(r, timeout=None)
self.port = self._listen(site)
self.portno = self.port.getHost().port
def at_webserver_root_creation(web_root):
"""
This is called as the web server has finished building its default
path tree. At this point, the media/ and static/ URIs have already
been added to the web root.
Args:
web_root (twisted.web.resource.Resource): The root
resource of the URI tree. Use .putChild() to
add new subdomains to the tree.
Returns:
web_root (twisted.web.resource.Resource): The potentially
modified root structure.
Example:
from twisted.web import static
my_page = static.File("web/mypage/")
my_page.indexNames = ["index.html"]
web_root.putChild("mypage", my_page)
"""
return web_root
def startService(self):
factory = WebSocketServerFactory(u"ws://127.0.0.1:%d" % self.port)
factory.protocol = EchoServerProtocol
# FIXME: Site.start/stopFactory should start/stop factories wrapped as Resources
factory.startFactory()
resource = WebSocketResource(factory)
# we server static files under "/" ..
webdir = os.path.abspath(pkg_resources.resource_filename("echows", "web"))
root = File(webdir)
# and our WebSocket server under "/ws" (note that Twisted uses
# bytes for URIs)
root.putChild(b"ws", resource)
# both under one Twisted Web Site
site = Site(root)
self.site = site
self.factory = factory
self.listener = reactor.listenTCP(self.port, site)
def test_notFound(self):
"""
If a request is made which encounters a L{File} before a final segment
which does not correspond to any file in the path the L{File} was
created with, a not found response is sent.
"""
base = FilePath(self.mktemp())
base.makedirs()
file = static.File(base.path)
request = DummyRequest([b'foobar'])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(request.responseCode, 404)
d.addCallback(cbRendered)
return d
def test_securityViolationNotFound(self):
"""
If a request is made which encounters a L{File} before a final segment
which cannot be looked up in the filesystem due to security
considerations, a not found response is sent.
"""
base = FilePath(self.mktemp())
base.makedirs()
file = static.File(base.path)
request = DummyRequest([b'..'])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(request.responseCode, 404)
d.addCallback(cbRendered)
return d
def test_forbiddenResource(self):
"""
If the file in the filesystem which would satisfy a request cannot be
read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
"""
base = FilePath(self.mktemp())
base.setContent(b'')
# Make sure we can delete the file later.
self.addCleanup(base.chmod, 0o700)
# Get rid of our own read permission.
base.chmod(0)
file = static.File(base.path)
request = DummyRequest([b''])
d = self._render(file, request)
def cbRendered(ignored):
self.assertEqual(request.responseCode, 403)
d.addCallback(cbRendered)
return d
def test_indexNames(self):
"""
If a request is made which encounters a L{File} before a final empty
segment, a file in the L{File} instance's C{indexNames} list which
exists in the path the L{File} was created with is served as the
response to the request.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child("foo.bar").setContent(b"baz")
file = static.File(base.path)
file.indexNames = [b'foo.bar']
request = DummyRequest([b''])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(b''.join(request.written), b'baz')
self.assertEqual(
request.responseHeaders.getRawHeaders(b'content-length')[0],
b'3')
d.addCallback(cbRendered)
return d
def test_staticFile(self):
"""
If a request is made which encounters a L{File} before a final segment
which names a file in the path the L{File} was created with, that file
is served as the response to the request.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child("foo.bar").setContent(b"baz")
file = static.File(base.path)
request = DummyRequest([b'foo.bar'])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(b''.join(request.written), b'baz')
self.assertEqual(
request.responseHeaders.getRawHeaders(b'content-length')[0],
b'3')
d.addCallback(cbRendered)
return d
def test_staticFileDeletedRender(self):
"""
A L{static.File} created for a file which does not exist should render
its C{childNotFound} page.
"""
staticFile = static.File(self.mktemp())
request = DummyRequest([b'foo.bar'])
request2 = DummyRequest([b'foo.bar'])
d = self._render(staticFile, request)
d2 = self._render(staticFile.childNotFound, request2)
def cbRendered2(ignored):
def cbRendered(ignored):
self.assertEqual(b''.join(request.written),
b''.join(request2.written))
d.addCallback(cbRendered)
return d
d2.addCallback(cbRendered2)
return d2
def test_ignoredExtensionsIgnored(self):
"""
A request for the I{base} child of a L{File} succeeds with a resource
for the I{base<extension>} file in the path the L{File} was created
with if such a file exists and the L{File} has been configured to
ignore the I{<extension>} extension.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child('foo.bar').setContent(b'baz')
base.child('foo.quux').setContent(b'foobar')
file = static.File(base.path, ignoredExts=(b".bar",))
request = DummyRequest([b"foo"])
child = resource.getChildForRequest(file, request)
d = self._render(child, request)
def cbRendered(ignored):
self.assertEqual(b''.join(request.written), b'baz')
d.addCallback(cbRendered)
return d
def test_directoryWithoutTrailingSlashRedirects(self):
"""
A request for a path which is a directory but does not have a trailing
slash will be redirected to a URL which does have a slash by L{File}.
"""
base = FilePath(self.mktemp())
base.makedirs()
base.child('folder').makedirs()
file = static.File(base.path)
request = DummyRequest([b"folder"])
request.uri = b"http://dummy/folder#baz?foo=bar"
child = resource.getChildForRequest(file, request)
self.successResultOf(self._render(child, request))
self.assertEqual(request.responseCode, FOUND)
self.assertEqual(request.responseHeaders.getRawHeaders(b"location"),
[b"http://dummy/folder/#baz?foo=bar"])
def setUp(self):
"""
Create a temporary file with a fixed payload of 64 bytes. Create a
resource for that file and create a request which will be for that
resource. Each test can set a different range header to test different
aspects of the implementation.
"""
path = FilePath(self.mktemp())
# This is just a jumble of random stuff. It's supposed to be a good
# set of data for this test, particularly in order to avoid
# accidentally seeing the right result by having a byte sequence
# repeated at different locations or by having byte values which are
# somehow correlated with their position in the string.
self.payload = (b'\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
b'\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
b'\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
b'&\xfd%\xdd\x82q/A\x10Y\x8b')
path.setContent(self.payload)
self.file = path.open()
self.resource = static.File(self.file.name)
self.resource.isLeaf = 1
self.request = DummyRequest([b''])
self.request.uri = self.file.name
self.catcher = []
log.addObserver(self.catcher.append)
def test_pathServer(self):
"""
The I{--path} option to L{makeService} causes it to return a service
which will listen on the server address given by the I{--port} option.
"""
path = FilePath(self.mktemp())
path.makedirs()
port = self.mktemp()
options = Options()
options.parseOptions(['--port', 'unix:' + port, '--path', path.path])
service = makeService(options)
service.startService()
self.addCleanup(service.stopService)
self.assertIsInstance(service.services[0].factory.resource, File)
self.assertEqual(service.services[0].factory.resource.path, path.path)
self.assertTrue(os.path.exists(port))
self.assertTrue(stat.S_ISSOCK(os.stat(port).st_mode))
def test_staticRoot(self):
app = self.app
request = requestMock(b"/__init__.py")
@app.route("/", branch=True)
def root(request):
return File(os.path.dirname(__file__))
d = _render(self.kr, request)
self.assertFired(d)
self.assertEqual(request.getWrittenData(),
open(
os.path.join(
os.path.dirname(__file__), "__init__.py"), 'rb').read())
self.assertEqual(request.finishCount, 1)
def test_explicitStaticBranch(self):
app = self.app
request = requestMock(b"/static/__init__.py")
@app.route("/static/", branch=True)
def root(request):
return File(os.path.dirname(__file__))
d = _render(self.kr, request)
self.assertFired(d)
self.assertEqual(request.getWrittenData(),
open(
os.path.join(
os.path.dirname(__file__), "__init__.py"), 'rb').read())
self.assertEqual(request.writeCount, 1)
self.assertEqual(request.finishCount, 1)
def sessionstart(reason, **kwargs):
if reason == 0 and "session" in kwargs:
try:
from Plugins.Extensions.WebInterface.WebChilds.Toplevel import addExternalChild
from Plugins.Extensions.WebInterface.WebChilds.Screenpage import ScreenPage
from twisted.python import util
from twisted.web import static
if hasattr(static.File, 'render_GET'):
class File(static.File):
def render_POST(self, request):
return self.render_GET(request)
else:
File = static.File
session = kwargs["session"]
root = File(util.sibpath(__file__, "web-data"))
root.putChild("web", ScreenPage(session, util.sibpath(__file__, "web"), True))
addExternalChild( ("internetradio", root, "Internet-Radio", "1", True) )
except ImportError:
pass # pah!
def __init__(self, cfg):
static_dir = os.path.join(cfg['assets'], 'static')
self.files = {}
for root, dirs, files in os.walk(static_dir):
for filename in files:
if not filename.startswith('!'):
fullpath = os.path.join(root, filename)
self.files[filename] = File(fullpath)
def loadChildren(self):
for f in os.listdir(self.path):
if os.path.isdir(os.path.join(self.path, f)):
self.resource.putChild(f, static.File(self.path + f))
def wchild_WebConduit2_js(self, request):
#print "returning js file"
h = request.getHeader("user-agent")
if h.count("MSIE"):
fl = "WebConduit2_msie.js"
else:
fl = "WebConduit2_mozilla.js"
return static.File(os.path.join(WOVEN_PATH, fl))
def wchild_FlashConduit_swf(self, request):
#print "returning flash file"
h = request.getHeader("user-agent")
if h.count("MSIE"):
fl = "FlashConduit.swf"
else:
fl = "FlashConduit.swf"
return static.File(os.path.join(WOVEN_PATH, fl))
def opt_processor(self, proc):
"""`ext=class' where `class' is added as a Processor for files ending
with `ext'.
"""
if not isinstance(self['root'], static.File):
raise usage.UsageError("You can only use --processor after --path.")
ext, klass = proc.split('=', 1)
self['root'].processors[ext] = reflect.namedClass(klass)
def opt_mime_type(self, defaultType):
"""Specify the default mime-type for static files."""
if not isinstance(self['root'], static.File):
raise usage.UsageError("You can only use --mime_type after --path.")
self['root'].defaultType = defaultType
def opt_ignore_ext(self, ext):
"""Specify an extension to ignore. These will be processed in order.
"""
if not isinstance(self['root'], static.File):
raise usage.UsageError("You can only use --ignore_ext "
"after --path.")
self['root'].ignoreExt(ext)
def setUp(self):
self.tmpdir = self.mktemp()
os.mkdir(self.tmpdir)
name = os.path.join(self.tmpdir, 'junk')
f = file(name, 'w')
f.write(8000 * 'x')
f.close()
self.file = static.File(name)
self.request = FakeRequest()