def testStaticPaths(self):
import os
dp = os.path.join(self.mktemp(),"hello")
ddp = os.path.join(dp, "goodbye")
tp = os.path.abspath(os.path.join(dp,"world.txt"))
tpy = os.path.join(dp,"wyrld.rpy")
os.makedirs(dp)
f = open(tp,"wb")
f.write("hello world")
f = open(tpy, "wb")
f.write("""
from twisted.web.static import Data
resource = Data('dynamic world','text/plain')
""")
f = static.File(dp)
f.processors = {
'.rpy': script.ResourceScript,
}
f.indexNames = f.indexNames + ['world.txt']
self.assertEquals(f.getChild('', DummyRequest([''])).path,
tp)
self.assertEquals(f.getChild('wyrld.rpy', DummyRequest(['wyrld.rpy'])
).__class__,
static.Data)
f = static.File(dp)
wtextr = DummyRequest(['world.txt'])
wtext = f.getChild('world.txt', wtextr)
self.assertEquals(wtext.path, tp)
wtext.render(wtextr)
self.assertEquals(wtextr.outgoingHeaders.get('content-length'),
str(len('hello world')))
self.assertNotEquals(f.getChild('', DummyRequest([''])).__class__,
static.File)
python类File()的实例源码
def testIgnoreExt(self):
f = static.File(".")
f.ignoreExt(".foo")
self.assertEquals(f.ignoredExts, [".foo"])
f = static.File(".")
self.assertEquals(f.ignoredExts, [])
f = static.File(".", ignoredExts=(".bar", ".baz"))
self.assertEquals(f.ignoredExts, [".bar", ".baz"])
def setUp(self):
self.resource = static.File(__file__)
self.resource.isLeaf = True
self.port = reactor.listenTCP(0, server.Site(self.resource), interface='127.0.0.1')
def create_site(params, pipes):
source_dir = os.path.dirname(os.path.abspath(__file__))
web_dir = os.path.join(source_dir, 'web')
root = static.File(web_dir)
root.putChild('pipes', PipeResource(params))
root.putChild('events', EventsResource(pipes.event_log))
return server.Site(root)
def static(request):
return File(settings.STATIC_DIR)
def startService(self):
factory = WebSocketServerFactory(u"ws://127.0.0.1:%d" % self.port,
debug=self.debug)
factory.protocol = DispatcherProtocol
factory.protocol.dispatcher = CommandDispatcher(self._core)
# FIXME: Site.start/stopFactory should start/stop factories wrapped as
# Resources
factory.startFactory()
resource = WebSocketResource(factory)
# we server static files under "/" ..
webdir = os.path.abspath(
pkg_resources.resource_filename('leap.bitmask.core', 'web'))
root = File(webdir)
# and our WebSocket server under "/ws"
root.putChild(u'bitmask', resource)
# both under one Twisted Web Site
site = Site(root)
self.site = site
self.factory = factory
self.listener = reactor.listenTCP(self.port, site)
def main(args):
import logging
if args.verbose:
LOGGER.setLevel(logging.DEBUG)
else:
LOGGER.setLevel(logging.INFO)
db = Database(args.db)
root = RootView(db, args.filter)
root.putChild('static', static.File("./static"))
site = server.Site(root)
LOGGER.info("Listening on http://localhost:%d" % args.port)
reactor.listenTCP(args.port, site)
reactor.run()
def wchild_WebConduit2_js(self, request):
#print "returning js file"
h = request.getHeader("user-agent")
if h.count("MSIE"):
fl = "WebConduit2_msie.js"
else:
fl = "WebConduit2_mozilla.js"
return static.File(os.path.join(WOVEN_PATH, fl))
def wchild_FlashConduit_swf(self, request):
#print "returning flash file"
h = request.getHeader("user-agent")
if h.count("MSIE"):
fl = "FlashConduit.swf"
else:
fl = "FlashConduit.swf"
return static.File(os.path.join(WOVEN_PATH, fl))
def opt_processor(self, proc):
"""`ext=class' where `class' is added as a Processor for files ending
with `ext'.
"""
if not isinstance(self['root'], static.File):
raise usage.UsageError("You can only use --processor after --path.")
ext, klass = proc.split('=', 1)
self['root'].processors[ext] = reflect.namedClass(klass)
def opt_mime_type(self, defaultType):
"""Specify the default mime-type for static files."""
if not isinstance(self['root'], static.File):
raise usage.UsageError("You can only use --mime_type after --path.")
self['root'].defaultType = defaultType
def opt_ignore_ext(self, ext):
"""Specify an extension to ignore. These will be processed in order.
"""
if not isinstance(self['root'], static.File):
raise usage.UsageError("You can only use --ignore_ext "
"after --path.")
self['root'].ignoreExt(ext)
def setUp(self):
self.tmpdir = self.mktemp()
os.mkdir(self.tmpdir)
name = os.path.join(self.tmpdir, 'junk')
f = file(name, 'w')
f.write(8000 * 'x')
f.close()
self.file = static.File(name)
self.request = FakeRequest()
def testStaticPaths(self):
import os
dp = os.path.join(self.mktemp(),"hello")
ddp = os.path.join(dp, "goodbye")
tp = os.path.abspath(os.path.join(dp,"world.txt"))
tpy = os.path.join(dp,"wyrld.rpy")
os.makedirs(dp)
f = open(tp,"wb")
f.write("hello world")
f = open(tpy, "wb")
f.write("""
from twisted.web.static import Data
resource = Data('dynamic world','text/plain')
""")
f = static.File(dp)
f.processors = {
'.rpy': script.ResourceScript,
}
f.indexNames = f.indexNames + ['world.txt']
self.assertEquals(f.getChild('', DummyRequest([''])).path,
tp)
self.assertEquals(f.getChild('wyrld.rpy', DummyRequest(['wyrld.rpy'])
).__class__,
static.Data)
f = static.File(dp)
wtextr = DummyRequest(['world.txt'])
wtext = f.getChild('world.txt', wtextr)
self.assertEquals(wtext.path, tp)
wtext.render(wtextr)
self.assertEquals(wtextr.outgoingHeaders.get('content-length'),
str(len('hello world')))
self.assertNotEquals(f.getChild('', DummyRequest([''])).__class__,
static.File)
def testIgnoreExt(self):
f = static.File(".")
f.ignoreExt(".foo")
self.assertEquals(f.ignoredExts, [".foo"])
f = static.File(".")
self.assertEquals(f.ignoredExts, [])
f = static.File(".", ignoredExts=(".bar", ".baz"))
self.assertEquals(f.ignoredExts, [".bar", ".baz"])
def setUp(self):
self.resource = static.File(__file__)
self.resource.isLeaf = True
self.port = reactor.listenTCP(0, server.Site(self.resource), interface='127.0.0.1')
def setUp(self):
self._resource = static.File(PARLAY_PATH + "/ui/dist")
def startService(self):
root = resource.Resource()
root.putChild('', web.Index(self))
root.putChild('api', web.API(self))
root.putChild("static", File(FilePath('jaguar/resources/static').path))
site = server.Site(root, logPath='jaguar-access.log')
reactor.listenTCP(80, site)
gpio.setmode(gpio.BCM)
self.sr.setup()
yield self.sr.clear()
yield utils.wait(1)
yield self.sr.shiftOut(reduce(lambda x, y: x | y, self.red))
self.btnGreen.setup()
self.btnRed.setup()
self.pwrGood.setup()
self.atxOn.setup()
for n in self.nodes:
n.setup()
yield utils.wait(1)
if self.atxOn.state == gpio.LOW:
yield self.powerGood(gpio.LOW)
self.t = task.LoopingCall(self.loop)
self.t.start(5.0)
def startService(self):
root = static.File(os.path.join(os.path.dirname(__file__), 'public'))
factory = ServerFactory()
factory.status = False
factory.led = GPIOHandler(self.pin)
factory.protocol = ServerProtocol
# factory.setProtocolOptions(maxConnections=2)
factory.startFactory()
resource = WebSocketResource(factory)
root.putChild('socket', resource)
site = server.Site(root)
self.server = reactor.listenTCP(self.port, site)
def __init__(self, dbrootdir="db", webdir="www/command", landingdir="www/landing"):
self.dbrootdir = dbrootdir
self.webdir = webdir
self.file_resource = File(landingdir)
Resource.__init__(self)