def _request(request, request_fallback=None):
''' Extract request fields wherever they may come from: GET, POST, forms, fallback '''
# Use lambdas to avoid evaluating bottle.request.* which may throw an Error
all_dicts = [
lambda: request.json,
lambda: request.forms,
lambda: request.query,
lambda: request.files,
#lambda: request.POST,
lambda: request_fallback
]
request_dict = dict()
for req_dict_ in all_dicts:
try:
req_dict = req_dict_()
except KeyError:
continue
if req_dict is not None and hasattr(req_dict, 'items'):
for req_key, req_val in req_dict.items():
request_dict[req_key] = req_val
return request_dict
python类POST的实例源码
def test_post(self):
""" Environ: POST data """
sq = tob('a=a&a=1&b=b&c=&d&cn=%e7%93%b6')
e = {}
wsgiref.util.setup_testing_defaults(e)
e['wsgi.input'].write(sq)
e['wsgi.input'].seek(0)
e['CONTENT_LENGTH'] = str(len(sq))
e['REQUEST_METHOD'] = "POST"
request = BaseRequest(e)
self.assertTrue('a' in request.POST)
self.assertTrue('b' in request.POST)
self.assertEqual(['a','1'], request.POST.getall('a'))
self.assertEqual(['b'], request.POST.getall('b'))
self.assertEqual('1', request.POST['a'])
self.assertEqual('b', request.POST['b'])
self.assertEqual('', request.POST['c'])
self.assertEqual('', request.POST['d'])
self.assertEqual(tonat(tob('?'), 'latin1'), request.POST['cn'])
self.assertEqual(touni('?'), request.POST.cn)
def build_preview():
if request.content_type == 'application/json':
rst = request.json['rst']
else:
rst = request.POST['rst']
key = md5(rst)
if key in _CACHE:
res, warnings = _CACHE[key]
else:
res, warnings = _CACHE[key] = rst2html(rst, theme='acr')
return {'result': res, 'warnings': warnings,
'valid': len(warnings) == 0}
def call_api(func, args=""):
response.headers.replace("Content-type", "application/json")
response.headers.append("Cache-Control", "no-cache, must-revalidate")
s = request.environ.get('beaker.session')
if 'session' in request.POST:
s = s.get_by_id(request.POST['session'])
if not s or not s.get("authenticated", False):
return HTTPError(403, json.dumps("Forbidden"))
if not PYLOAD.isAuthorized(func, {"role": s["role"], "permission": s["perms"]}):
return HTTPError(401, json.dumps("Unauthorized"))
args = args.split("/")[1:]
kwargs = {}
for x, y in chain(request.GET.iteritems(), request.POST.iteritems()):
if x == "session": continue
kwargs[x] = unquote(y)
try:
return callApi(func, *args, **kwargs)
except Exception, e:
print_exc()
return HTTPError(500, json.dumps({"error": e.message, "traceback": format_exc()}))
def add(request):
package = request.POST.get('referer', None)
urls = filter(lambda x: x != "", request.POST['urls'].split("\n"))
if package:
PYLOAD.addPackage(package, urls, 0)
else:
PYLOAD.generateAndAddPackages(urls, 0)
return ""
def set_captcha():
if request.environ.get('REQUEST_METHOD', "GET") == "POST":
try:
PYLOAD.setCaptchaResult(request.forms["cap_id"], request.forms["cap_result"])
except:
pass
task = PYLOAD.getCaptchaTask()
if task.tid >= 0:
src = "data:image/%s;base64,%s" % (task.type, task.data)
return {'captcha': True, 'id': task.tid, 'src': src, 'result_type' : task.resultType}
else:
return {'captcha': False}
def save_config(category):
for key, value in request.POST.iteritems():
try:
section, option = key.split("|")
except:
continue
if category == "general": category = "core"
PYLOAD.setConfigValue(section, option, decode(value), category)
def add_account():
login = request.POST["account_login"]
password = request.POST["account_password"]
type = request.POST["account_type"]
PYLOAD.updateAccount(type, login, password)
def change_password():
user = request.POST["user_login"]
oldpw = request.POST["login_current_password"]
newpw = request.POST["login_new_password"]
if not PYLOAD.changePassword(user, oldpw, newpw):
print "Wrong password"
return HTTPError()
def test_method(self):
self.assertEqual(BaseRequest({}).method, 'GET')
self.assertEqual(BaseRequest({'REQUEST_METHOD':'GET'}).method, 'GET')
self.assertEqual(BaseRequest({'REQUEST_METHOD':'GeT'}).method, 'GET')
self.assertEqual(BaseRequest({'REQUEST_METHOD':'get'}).method, 'GET')
self.assertEqual(BaseRequest({'REQUEST_METHOD':'POst'}).method, 'POST')
self.assertEqual(BaseRequest({'REQUEST_METHOD':'FanTASY'}).method, 'FANTASY')
def test_bodypost(self):
sq = tob('foobar')
e = {}
wsgiref.util.setup_testing_defaults(e)
e['wsgi.input'].write(sq)
e['wsgi.input'].seek(0)
e['CONTENT_LENGTH'] = str(len(sq))
e['REQUEST_METHOD'] = "POST"
request = BaseRequest(e)
self.assertEqual('', request.POST['foobar'])
def test_body_noclose(self):
""" Test that the body file handler is not closed after request.POST """
sq = tob('a=a&a=1&b=b&c=&d')
e = {}
wsgiref.util.setup_testing_defaults(e)
e['wsgi.input'].write(sq)
e['wsgi.input'].seek(0)
e['CONTENT_LENGTH'] = str(len(sq))
e['REQUEST_METHOD'] = "POST"
request = BaseRequest(e)
self.assertEqual(sq, request.body.read())
request.POST # This caused a body.close() with Python 3.x
self.assertEqual(sq, request.body.read())
def test_params(self):
""" Environ: GET and POST are combined in request.param """
e = {}
wsgiref.util.setup_testing_defaults(e)
e['wsgi.input'].write(tob('b=b&c=p'))
e['wsgi.input'].seek(0)
e['CONTENT_LENGTH'] = '7'
e['QUERY_STRING'] = 'a=a&c=g'
e['REQUEST_METHOD'] = "POST"
request = BaseRequest(e)
self.assertEqual(['a','b','c'], sorted(request.params.keys()))
self.assertEqual('p', request.params['c'])
def test_multipart(self):
""" Environ: POST (multipart files and multible values per key) """
fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
files = [('file1','filename1.txt','content1'), ('??','??foo.py', 'ä\nö\rü')]
e = tools.multipart_environ(fields=fields, files=files)
request = BaseRequest(e)
# File content
self.assertTrue('file1' in request.POST)
self.assertTrue('file1' in request.files)
self.assertTrue('file1' not in request.forms)
cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1'
self.assertEqual(cmp, request.POST['file1'].file.read())
# File name and meta data
self.assertTrue('??' in request.POST)
self.assertTrue('??' in request.files)
self.assertTrue('??' not in request.forms)
self.assertEqual('foo.py', request.POST['??'].filename)
self.assertTrue(request.files['??'])
self.assertFalse(request.files.file77)
# UTF-8 files
x = request.POST['??'].file.read()
if (3,2,0) > sys.version_info >= (3,0,0):
x = x.encode('utf8')
self.assertEqual(tob('ä\nö\rü'), x)
# No file
self.assertTrue('file3' not in request.POST)
self.assertTrue('file3' not in request.files)
self.assertTrue('file3' not in request.forms)
# Field (single)
self.assertEqual('value1', request.POST['field1'])
self.assertTrue('field1' not in request.files)
self.assertEqual('value1', request.forms['field1'])
# Field (multi)
self.assertEqual(2, len(request.POST.getall('field2')))
self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
self.assertEqual(['value2', 'value3'], request.forms.getall('field2'))
self.assertTrue('field2' not in request.files)
def contact(db):
"""
Our contact-us form, basically, present a form if it's a GET request,
validate and process the form if it's a POST request. Filthy but works.
"""
form = ContactForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'})
if request.method == 'POST' and form.validate():
# process the form, captcha is valid.
message_text = "Contact Email: {email}\n\n {contact_text}".format(
email=form.email.data,
contact_text=form.contact_text.data
)
# put together a gmail client for sending messages
gmail_client = GMail(settings.ADMIN_EMAIL,
settings.ADMIN_EMAIL_PASSWORD)
message = Message('[xqzes] Contact Form',
to=settings.ADMIN_EMAIL,
text=message_text)
gmail_client.send(message)
return redirect("/contact/?contacted=True")
return template(
'contact',
form=form,
contacted=strtobool(request.GET.get('contacted', 'false'))
)
def submit(db):
# TODO: break this out along with others in to an excuses package.
class SubmissionForm(Form):
attribution_name = StringField(
'Your Name (for future attribution purposes)',
[
validators.InputRequired(),
validators.Length(
min=3,
max=50,
message="Srsly, give us a decent username "
"(%(min)d - %(max)d chars),"
" doesn't even have to be real."
)
]
)
excuse = TextAreaField(
'What\'s YOUR excuse !?!?',
[
validators.Length(
min=5,
max=140,
message="Please provide %(min)d - %(max)d "
"characters"),
]
)
nocaptcha = NoCaptchaField(
public_key=settings.RECAPTCHA_SITE_KEY,
private_key=settings.RECAPTCHA_SECRET_KEY,
secure=True,
)
form = SubmissionForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'})
submitted = False
if request.method == 'POST' and form.validate():
excuse_record = Excuse(form.attribution_name.data,
form.excuse.data)
db.add(excuse_record)
submitted = True
return template('submit', form=form, submitted=submitted)