def do_POST(self):
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={"REQUEST_METHOD":"POST",
"CONTENT_TYPE":self.headers["Content-Type"]
})
if "text" in form:
text = form["text"].value.decode("utf-8")
if len(text) <= 10000:
print(u"Document text: {}".format(text))
example = make_predictions(text, self.model)
print_predictions(example)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(example))
return
self.send_response(400)
self.send_header("Content-Type", "application/json")
self.end_headers()
python类FieldStorage()的实例源码
def transcode_fs(self, fs, content_type):
# transcode FieldStorage
if PY3: # pragma: no cover
decode = lambda b: b
else:
decode = lambda b: b.decode(self.charset, self.errors)
data = []
for field in fs.list or ():
field.name = decode(field.name)
if field.filename:
field.filename = decode(field.filename)
data.append((field.name, field))
else:
data.append((field.name, decode(field.value)))
# TODO: transcode big requests to temp file
content_type, fout = _encode_multipart(
data,
content_type,
fout=io.BytesIO()
)
return fout
# TODO: remove in 1.4
def getUploads(self, field_name = None):
"""
Get uploads sent to this handler.
Cheeky borrowed from blobstore_handlers.py - © 2007 Google Inc.
Args:
field_name: Only select uploads that were sent as a specific field.
Returns:
A list of BlobInfo records corresponding to each upload.
Empty list if there are no blob-info records for field_name.
"""
uploads = collections.defaultdict(list)
for key, value in request.current.get().request.params.items():
if isinstance(value, cgi.FieldStorage):
if "blob-key" in value.type_options:
uploads[key].append(blobstore.parse_blob_info(value))
if field_name:
return list(uploads.get(field_name, []))
results = []
for uploads in uploads.itervalues():
results.extend(uploads)
return results
def getUploads(self, field_name=None):
"""
Get uploads sent to this handler.
Cheeky borrowed from blobstore_handlers.py - © 2007 Google Inc.
Args:
field_name: Only select uploads that were sent as a specific field.
Returns:
A list of BlobInfo records corresponding to each upload.
Empty list if there are no blob-info records for field_name.
"""
uploads = collections.defaultdict(list)
for key, value in request.current.get().request.params.items():
if isinstance(value, cgi.FieldStorage):
if 'blob-key' in value.type_options:
uploads[key].append(blobstore.parse_blob_info(value))
if field_name:
return list(uploads.get(field_name, []))
else:
results = []
for uploads in uploads.itervalues():
results.extend(uploads)
return results
def process(self):
"""Returns information about a commit"""
form = cgi.FieldStorage()
commit = self.read_commit(form)
print("Content-Type: text/plain; charset='utf-8'\r")
print("Cache-Control: max-age=60\r")
if form.getfirst("download", "false") == "true":
print("Content-Disposition: attachment; filename=\"patch.txt\"\r")
print("\r")
print("#" + json.dumps(PostsaiCommitViewer.format_commit_header(commit), default=convert_to_builtin_type))
sys.stdout.flush()
PostsaiCommitViewer.dump_commit_diff(commit)
def do_test(buf, method):
env = {}
if method == "GET":
fp = None
env['REQUEST_METHOD'] = 'GET'
env['QUERY_STRING'] = buf
elif method == "POST":
fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
env['CONTENT_LENGTH'] = str(len(buf))
else:
raise ValueError("unknown method: %s" % method)
try:
return cgi.parse(fp, env, strict_parsing=1)
except Exception as err:
return ComparableException(err)
def test_strict(self):
for orig, expect in parse_strict_test_cases:
# Test basic parsing
d = do_test(orig, "GET")
self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
d = do_test(orig, "POST")
self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(environ=env)
if isinstance(expect, dict):
# test dict interface
self.assertEqual(len(expect), len(fs))
self.assertCountEqual(expect.keys(), fs.keys())
##self.assertEqual(norm(expect.values()), norm(fs.values()))
##self.assertEqual(norm(expect.items()), norm(fs.items()))
self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
# test individual fields
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
def test_fieldstorage_multipart(self):
#Test basic FieldStorage multipart parsing
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '558'}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_leading_whitespace(self):
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '560'}
# Add some leading whitespace to our post data that will cause the
# first line to not be the innerboundary.
fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_w3c(self):
# Test basic FieldStorage multipart parsing (W3C sample)
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3),
'CONTENT_LENGTH': str(len(POSTDATA_W3))}
fp = BytesIO(POSTDATA_W3.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 2)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
self.assertEqual(fs.list[1].name, 'files')
files = fs.list[1].value
self.assertEqual(len(files), 2)
expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'},
{'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}]
for x in range(len(files)):
for k, exp in expect[x].items():
got = getattr(files[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_part_content_length(self):
BOUNDARY = "JfISa01"
POSTDATA = """--JfISa01
Content-Disposition: form-data; name="submit-name"
Content-Length: 5
Larry
--JfISa01"""
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': str(len(POSTDATA))}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 1)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def get_uploads(self, field_name=None):
"""Get uploads sent to this handler.
Args:
field_name: Only select uploads that were sent as a specific field.
Returns:
A list of BlobInfo records corresponding to each upload.
Empty list if there are no blob-info records for field_name.
"""
if self.__uploads is None:
self.__uploads = collections.defaultdict(list)
for key, value in self.request.params.items():
if isinstance(value, cgi.FieldStorage):
if 'blob-key' in value.type_options:
self.__uploads[key].append(blobstore.parse_blob_info(value))
if field_name:
return list(self.__uploads.get(field_name, []))
else:
results = []
for uploads in self.__uploads.itervalues():
results.extend(uploads)
return results
def get_file_infos(self, field_name=None):
"""Get the file infos associated to the uploads sent to this handler.
Args:
field_name: Only select uploads that were sent as a specific field.
Specify None to select all the uploads.
Returns:
A list of FileInfo records corresponding to each upload.
Empty list if there are no FileInfo records for field_name.
"""
if self.__file_infos is None:
self.__file_infos = collections.defaultdict(list)
for key, value in self.request.params.items():
if isinstance(value, cgi.FieldStorage):
if 'blob-key' in value.type_options:
self.__file_infos[key].append(blobstore.parse_file_info(value))
if field_name:
return list(self.__file_infos.get(field_name, []))
else:
results = []
for uploads in self.__file_infos.itervalues():
results.extend(uploads)
return results
def transcode_fs(self, fs, content_type):
# transcode FieldStorage
if PY3: # pragma: no cover
decode = lambda b: b
else:
decode = lambda b: b.decode(self.charset, self.errors)
data = []
for field in fs.list or ():
field.name = decode(field.name)
if field.filename:
field.filename = decode(field.filename)
data.append((field.name, field))
else:
data.append((field.name, decode(field.value)))
# TODO: transcode big requests to temp file
content_type, fout = _encode_multipart(
data,
content_type,
fout=io.BytesIO()
)
return fout
# TODO: remove in 1.4
def test_file_request(self):
import cgi
# fill in a file upload form...
form = self.make_form()
form["user"] = "john"
data_control = form.find_control("data")
data = "blah\nbaz\n"
data_control.add_file(StringIO(data))
# print "data_control._upload_data", data_control._upload_data
req = form.click()
self.assertTrue(
get_header(req, "Content-type").startswith(
"multipart/form-data; boundary="))
# print "req.get_data()\n>>%s<<" % req.get_data()
# ...and check the resulting request is understood by cgi module
fs = cgi.FieldStorage(
StringIO(req.get_data()),
CaseInsensitiveDict(header_items(req)),
environ={"REQUEST_METHOD": "POST"})
self.assert_(fs["user"].value == "john")
self.assert_(fs["data"].value == data)
self.assertEquals(fs["data"].filename, "")
def test_file_request_with_filename(self):
import cgi
# fill in a file upload form...
form = self.make_form()
form["user"] = "john"
data_control = form.find_control("data")
data = "blah\nbaz\n"
data_control.add_file(StringIO(data), filename="afilename")
req = form.click()
self.assert_(
get_header(req, "Content-type").startswith(
"multipart/form-data; boundary="))
# ...and check the resulting request is understood by cgi module
fs = cgi.FieldStorage(
StringIO(req.get_data()),
CaseInsensitiveDict(header_items(req)),
environ={"REQUEST_METHOD": "POST"})
self.assert_(fs["user"].value == "john")
self.assert_(fs["data"].value == data)
self.assert_(fs["data"].filename == "afilename")
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'