def _request(request, request_fallback=None):
''' Extract request fields wherever they may come from: GET, POST, forms, fallback '''
# Use lambdas to avoid evaluating bottle.request.* which may throw an Error
all_dicts = [
lambda: request.json,
lambda: request.forms,
lambda: request.query,
lambda: request.files,
#lambda: request.POST,
lambda: request_fallback
]
request_dict = dict()
for req_dict_ in all_dicts:
try:
req_dict = req_dict_()
except KeyError:
continue
if req_dict is not None and hasattr(req_dict, 'items'):
for req_key, req_val in req_dict.items():
request_dict[req_key] = req_val
return request_dict
python类files()的实例源码
def echo():
try:
body = request.body.read().decode('utf-8')
except:
body = None
result = {
'method': request.method,
'headers': dict(request.headers),
'body': body,
'files': [
{'key': key, 'name': request.files[key].raw_filename}
for key in request.files
]
}
return result
def book_resources():
data = request.files.get('data')
logger.debug("files: %s" % list(request.files.keys()))
for file in request.files:
logger.debug("file %s" % file)
logger.debug("Data: '%s'" % data)
# logger.debug("Data.file: %s" % data.file)
if data and data.file:
Experiment(data.file, username=aaa.current_user.username).reserve()
bottle.redirect('/experimenter')
logger.debug(("got body: %s" % request.body.read()))
raise FileNotFoundError("File not found in your request")
def create_resource():
resource_id = request.forms.get('id')
node_type = request.forms.get('node_type')
cardinality = request.forms.get('cardinality')
description = request.forms.get('description')
testbed = request.forms.get('testbed')
upload = request.files.get('upload')
add_resource(aaa.current_user.username, resource_id, node_type, cardinality, description, testbed, upload)
bottle.redirect('/experimenter')
#################
# General pages #
#################
def server_static(filename):
""" route to the css and static files"""
if ".." in filename:
return HTTPError(status=403)
return bottle.static_file(filename, root='%s/static' % get_config('api', 'view-path', '/etc/softfire/views'))
#########
# Utils #
#########
def add_package():
name = request.forms.get("add_name", "New Package").strip()
queue = int(request.forms['add_dest'])
links = decode(request.forms['add_links'])
links = links.split("\n")
pw = request.forms.get("add_password", "").strip("\n\r")
try:
f = request.files['add_file']
if not name or name == "New Package":
name = f.name
fpath = join(PYLOAD.getConfigValue("general", "download_folder"), "tmp_" + f.filename)
destination = open(fpath, 'wb')
copyfileobj(f.file, destination)
destination.close()
links.insert(0, fpath)
except:
pass
name = name.decode("utf8", "ignore")
links = map(lambda x: x.strip(), links)
links = filter(lambda x: x != "", links)
pack = PYLOAD.addPackage(name, links, queue)
if pw:
pw = pw.decode("utf8", "ignore")
data = {"password": pw}
PYLOAD.setPackageData(pack, data)
def test_bigbody(self):
""" Environ: Request.body should handle big uploads using files """
e = {}
wsgiref.util.setup_testing_defaults(e)
e['wsgi.input'].write(tob('x')*1024*1000)
e['wsgi.input'].seek(0)
e['CONTENT_LENGTH'] = str(1024*1000)
request = BaseRequest(e)
self.assertTrue(hasattr(request.body, 'fileno'))
self.assertEqual(1024*1000, len(request.body.read()))
self.assertEqual(1024, len(request.body.read(1024)))
self.assertEqual(1024*1000, len(request.body.readline()))
self.assertEqual(1024, len(request.body.readline(1024)))
def test_multipart(self):
""" Environ: POST (multipart files and multible values per key) """
fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
files = [('file1','filename1.txt','content1'), ('??','??foo.py', 'ä\nö\rü')]
e = tools.multipart_environ(fields=fields, files=files)
request = BaseRequest(e)
# File content
self.assertTrue('file1' in request.POST)
self.assertTrue('file1' in request.files)
self.assertTrue('file1' not in request.forms)
cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1'
self.assertEqual(cmp, request.POST['file1'].file.read())
# File name and meta data
self.assertTrue('??' in request.POST)
self.assertTrue('??' in request.files)
self.assertTrue('??' not in request.forms)
self.assertEqual('foo.py', request.POST['??'].filename)
self.assertTrue(request.files['??'])
self.assertFalse(request.files.file77)
# UTF-8 files
x = request.POST['??'].file.read()
if (3,2,0) > sys.version_info >= (3,0,0):
x = x.encode('utf8')
self.assertEqual(tob('ä\nö\rü'), x)
# No file
self.assertTrue('file3' not in request.POST)
self.assertTrue('file3' not in request.files)
self.assertTrue('file3' not in request.forms)
# Field (single)
self.assertEqual('value1', request.POST['field1'])
self.assertTrue('field1' not in request.files)
self.assertEqual('value1', request.forms['field1'])
# Field (multi)
self.assertEqual(2, len(request.POST.getall('field2')))
self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
self.assertEqual(['value2', 'value3'], request.forms.getall('field2'))
self.assertTrue('field2' not in request.files)