def parse_multipart(fp, ctype, clength, encoding):
""" Parse multipart/form-data request. Returns
a tuple (form, files).
"""
fs = FieldStorage(
fp=fp,
environ=MULTIPART_ENVIRON,
headers={
'content-type': ctype,
'content-length': clength
},
keep_blank_values=True
)
form = {}
files = {}
for f in fs.list:
if f.filename:
files.setdefault(f.name, []).append(f)
else:
form.setdefault(f.name, []).append(f.value)
return form, files
python类FieldStorage()的实例源码
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_cgi_field_storage(self):
# encode a multipart form
content_type, body, content_length = encode_multipart_data(files=dict(cat=self.cat_jpeg))
environ = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': content_type,
'CONTENT_LENGTH': content_length
}
storage = cgi.FieldStorage(body, environ=environ)
descriptor = AttachableDescriptor(storage['cat'])
self.assertIsInstance(descriptor, CgiFieldStorageDescriptor)
self.assertEqual(descriptor.content_type, 'image/jpeg')
self.assertEqual(descriptor.original_filename, split(self.cat_jpeg)[1])
buffer = io.BytesIO()
copy_stream(descriptor, buffer)
buffer.seek(0)
self.assertEqual(md5sum(buffer), md5sum(self.cat_jpeg))
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def main():
print HTML_HEADER
print HEAD
data = cgi.FieldStorage()
fileds = data['file']
if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.png') or fileds.filename.endswith('.jpeg') or fileds.filename.endswith('.tiff') and fileds.filename.count('/') == -1:
os.chdir('files')
with open(fileds.filename, 'wb') as fout:
shutil.copyfileobj(fileds.file, fout, 100000)
os.chdir('../')
# do NOT touch above code
if fileds.filename.endswith('.png'):
print lsb.reveal("files/"+fileds.filename)
if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.jpeg'):
print exifHeader.reveal("files/"+fileds.filename)
print "<p>Attempted to decode.</p>"
print END
def main():
print HTML_HEADER
print HEAD
data = cgi.FieldStorage()
fileds = data['file']
if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.png') or fileds.filename.endswith('.jpeg') or fileds.filename.endswith('.tiff') and fileds.filename.count('/') == -1:
os.chdir('files')
with open(fileds.filename, 'wb') as fout:
shutil.copyfileobj(fileds.file, fout, 100000)
os.chdir('../')
# do NOT touch above code
if fileds.filename.endswith('.png'):
sec = lsb.hide('files/'+fileds.filename, data['message'].value)
sec.save('files/'+fileds.filename)
if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.jpeg'):
secret = exifHeader.hide('files/'+fileds.filename, 'files/'+fileds.filename, secret_message=data['message'].value)
print "Successfully generated."
print '<a href="http://jonathanwong.koding.io/bstego/files/'+fileds.filename+'">Link here</a>'
print END
def transcode_fs(self, fs, content_type):
# transcode FieldStorage
if PY3: # pragma: no cover
decode = lambda b: b
else:
decode = lambda b: b.decode(self.charset, self.errors)
data = []
for field in fs.list or ():
field.name = decode(field.name)
if field.filename:
field.filename = decode(field.filename)
data.append((field.name, field))
else:
data.append((field.name, decode(field.value)))
# TODO: transcode big requests to temp file
content_type, fout = _encode_multipart(
data,
content_type,
fout=io.BytesIO()
)
return fout
# TODO: remove in 1.4
def transcode_fs(self, fs, content_type):
# transcode FieldStorage
if PY3: # pragma: no cover
decode = lambda b: b
else:
decode = lambda b: b.decode(self.charset, self.errors)
data = []
for field in fs.list or ():
field.name = decode(field.name)
if field.filename:
field.filename = decode(field.filename)
data.append((field.name, field))
else:
data.append((field.name, decode(field.value)))
# TODO: transcode big requests to temp file
content_type, fout = _encode_multipart(
data,
content_type,
fout=io.BytesIO()
)
return fout
# TODO: remove in 1.4
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def do_test(buf, method):
env = {}
if method == "GET":
fp = None
env['REQUEST_METHOD'] = 'GET'
env['QUERY_STRING'] = buf
elif method == "POST":
fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
env['CONTENT_LENGTH'] = str(len(buf))
else:
raise ValueError("unknown method: %s" % method)
try:
return cgi.parse(fp, env, strict_parsing=1)
except Exception as err:
return ComparableException(err)
def test_strict(self):
for orig, expect in parse_strict_test_cases:
# Test basic parsing
d = do_test(orig, "GET")
self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
d = do_test(orig, "POST")
self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(environ=env)
if isinstance(expect, dict):
# test dict interface
self.assertEqual(len(expect), len(fs))
self.assertCountEqual(expect.keys(), fs.keys())
##self.assertEqual(norm(expect.values()), norm(fs.values()))
##self.assertEqual(norm(expect.items()), norm(fs.items()))
self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
# test individual fields
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
def test_fieldstorage_multipart(self):
#Test basic FieldStorage multipart parsing
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '558'}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def files(self):
""" File uploads parsed from an `url-encoded` or `multipart/form-data`
encoded POST or PUT request body. The values are instances of
:class:`cgi.FieldStorage`. The most important attributes are:
filename
The filename, if specified; otherwise None; this is the client
side filename, *not* the file name on which it is stored (that's
a temporary file you don't deal with)
file
The file(-like) object from which you can read the data.
value
The value as a *string*; for file uploads, this transparently
reads the file every time you request the value. Do not do this
on big files.
"""
files = FormsDict()
for name, item in self.POST.iterallitems():
if hasattr(item, 'filename'):
files[name] = item
return files
def POST(self):
""" The values of :attr:`forms` and :attr:`files` combined into a single
:class:`FormsDict`. Values are either strings (form values) or
instances of :class:`cgi.FieldStorage` (file uploads).
"""
post = FormsDict()
safe_env = {'QUERY_STRING':''} # Build a safe environment for cgi
for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'):
if key in self.environ: safe_env[key] = self.environ[key]
if NCTextIOWrapper:
fb = NCTextIOWrapper(self.body, encoding='ISO-8859-1', newline='\n')
else:
fb = self.body
data = cgi.FieldStorage(fp=fb, environ=safe_env, keep_blank_values=True)
for item in data.list or []:
post[item.name] = item if item.filename else item.value
return post
def do_test(buf, method):
env = {}
if method == "GET":
fp = None
env['REQUEST_METHOD'] = 'GET'
env['QUERY_STRING'] = buf
elif method == "POST":
fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
env['CONTENT_LENGTH'] = str(len(buf))
else:
raise ValueError("unknown method: %s" % method)
try:
return cgi.parse(fp, env, strict_parsing=1)
except Exception as err:
return ComparableException(err)
def test_strict(self):
for orig, expect in parse_strict_test_cases:
# Test basic parsing
d = do_test(orig, "GET")
self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
d = do_test(orig, "POST")
self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(environ=env)
if isinstance(expect, dict):
# test dict interface
self.assertEqual(len(expect), len(fs))
self.assertCountEqual(expect.keys(), fs.keys())
##self.assertEqual(norm(expect.values()), norm(fs.values()))
##self.assertEqual(norm(expect.items()), norm(fs.items()))
self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
# test individual fields
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
def test_fieldstorage_multipart_w3c(self):
# Test basic FieldStorage multipart parsing (W3C sample)
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3),
'CONTENT_LENGTH': str(len(POSTDATA_W3))}
fp = BytesIO(POSTDATA_W3.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 2)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
self.assertEqual(fs.list[1].name, 'files')
files = fs.list[1].value
self.assertEqual(len(files), 2)
expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'},
{'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}]
for x in range(len(files)):
for k, exp in expect[x].items():
got = getattr(files[x], k)
self.assertEqual(got, exp)
def test_app(environ, start_response):
"""Probably not the most efficient example."""
import cgi
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<html><head><title>Hello World!</title></head>\n' \
'<body>\n' \
'<p>Hello World!</p>\n' \
'<table border="1">'
names = environ.keys()
names.sort()
for name in names:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
name, cgi.escape(`environ[name]`))
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
keep_blank_values=1)
if form.list:
yield '<tr><th colspan="2">Form data</th></tr>'
for field in form.list:
yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
field.name, field.value)
yield '</table>\n' \
'</body></html>\n'
def main():
print("Content-Type: text/html")
print()
print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''')
form = cgi.FieldStorage()
reqid = form.getfirst('reqid', '').strip()
if not check(reqid):
return
if not sendmail(reqid):
return
print('???????????????????????????????')
def main():
print("Content-Type: text/html")
print()
print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''')
form = cgi.FieldStorage()
reqid = form.getfirst('reqid', '').strip()
if not check(reqid):
return
if not sendmail(reqid):
return
print('???????????????????????????????')
def main():
print("Content-Type: text/html")
print()
print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''')
if os.environ['REQUEST_METHOD'] != 'POST':
return
form = cgi.FieldStorage()
ret = check(form)
if not ret:
return
mailaddr, title, desc = ret
filename = save(mailaddr, title, desc)
url = 'http://www.python.jp/cgi-bin/confirm-news.py?reqid=%s' % filename
if not sendmail(mailaddr, title, desc, url):
return
print('????????????????????????????????')
def main():
print("Content-Type: text/html")
print()
print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''')
if os.environ['REQUEST_METHOD'] != 'POST':
return
form = cgi.FieldStorage()
ret = check(form)
if not ret:
return
mailaddr, title, url, date, desc = ret
filename = save(mailaddr, title, url, date, desc)
conf_url = 'http://www.python.jp/cgi-bin/confirm-connpass-event.py?reqid=%s' % filename
if not sendmail(mailaddr, title, url, date, desc, conf_url):
return
print('????????????????????????????????')
def main():
print("Content-Type: text/html")
print()
print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''')
if os.environ['REQUEST_METHOD'] != 'POST':
return
form = cgi.FieldStorage()
ret = check(form)
if not ret:
return
mailaddr, name, url, date, desc = ret
filename = save(mailaddr, name, url, date, desc)
conf_url = 'http://www.python.jp/cgi-bin/confirm-jobboard.py?reqid=%s' % filename
if not sendmail(mailaddr, name, url, date, desc, conf_url):
return
print('????????????????????????????????')