def login_post():
user = request.forms.get("username")
password = request.forms.get("password")
info = PYLOAD.checkAuth(user, password)
if not info:
return render_to_response("login.html", {"errors": True}, [pre_processor])
set_session(request, info)
return redirect("/")
python类forms()的实例源码
def test_multipart(self):
""" Environ: POST (multipart files and multible values per key) """
fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
files = [('file1','filename1.txt','content1'), ('??','??foo.py', 'ä\nö\rü')]
e = tools.multipart_environ(fields=fields, files=files)
request = BaseRequest(e)
# File content
self.assertTrue('file1' in request.POST)
self.assertTrue('file1' in request.files)
self.assertTrue('file1' not in request.forms)
cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1'
self.assertEqual(cmp, request.POST['file1'].file.read())
# File name and meta data
self.assertTrue('??' in request.POST)
self.assertTrue('??' in request.files)
self.assertTrue('??' not in request.forms)
self.assertEqual('foo.py', request.POST['??'].filename)
self.assertTrue(request.files['??'])
self.assertFalse(request.files.file77)
# UTF-8 files
x = request.POST['??'].file.read()
if (3,2,0) > sys.version_info >= (3,0,0):
x = x.encode('utf8')
self.assertEqual(tob('ä\nö\rü'), x)
# No file
self.assertTrue('file3' not in request.POST)
self.assertTrue('file3' not in request.files)
self.assertTrue('file3' not in request.forms)
# Field (single)
self.assertEqual('value1', request.POST['field1'])
self.assertTrue('field1' not in request.files)
self.assertEqual('value1', request.forms['field1'])
# Field (multi)
self.assertEqual(2, len(request.POST.getall('field2')))
self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
self.assertEqual(['value2', 'value3'], request.forms.getall('field2'))
self.assertTrue('field2' not in request.files)
def parse(self, body):
return request.forms
############################################################
# Base objects
############################################################
def post_new(tgtbox, boxes_rpc):
box = _get_box_rpc(tgtbox, boxes_rpc)
if box.lock_exists():
return template('error', errormsg="It looks like an experiment is already running on this box. Please wait for it to finish before starting another.")
# validate form data
form = NewExperimentForm(request.forms)
if not form.validate():
return template('new', dict(form=form, box=box))
expname = form.expname.data
notes = form.notes.data
inifile = form.inifile.data
def get_phase(phase):
length = phase.length.data
stimulus = phase.stimulus.data
background = phase.background.data
return (length, stimulus, background)
phases = [get_phase(p) for p in form.phases if p.enabled.data == 'True']
box.start_experiment(expname, notes, inifile, phases)
redirect("/")
def jsonp(view):
def wrapped(*posargs, **kwargs):
args = {}
# if we access the args via get(),
# we can get encoding errors...
for k in request.forms:
args[k] = getattr(request.forms, k)
for k in request.query:
args[k] = getattr(request.query, k)
callback = args.get('callback')
status_code = 200
try:
result = view(args, *posargs, **kwargs)
except (KeyError) as e:#ValueError, AttributeError, KeyError) as e:
import traceback, sys
traceback.print_exc(file=sys.stdout)
result = {'status':'error',
'message':'invalid query',
'details': str(e)}
status_code = 403
if callback:
result = '%s(%s);' % (callback, json.dumps(result))
if status_code == 200:
return result
else:
abort(status_code, result)
return wrapped
def verify_recaptcha_form():
if config['DEBUG']:
return True
captcha_rs = request.forms.get('g-recaptcha-response')
req = requests.get('https://www.google.com/recaptcha/api/siteverify',
params = {
'secret': config['RECAPTCHA_SECRET_KEY'],
'response': request.forms.get('g-recaptcha-response'),
'remoteip': request.remote_addr
}, verify=True).json()
return req.get("success", False)
def account(user=None):
if not user: return users.login_required()
subscribe = request.forms.get('subscribe')
if subscribe:
user['subscribed'] = True
else:
user['subscribed'] = False
r.table('users').get(user['identity']).replace(user).run(conn())
users.flash('info', 'Settings updated')
return redirect('/account')
def account_delete(user=None):
if not user: return users.login_required()
if request.forms.get('delete'):
# TODO Delete all answers
r.table('users').get(user['identity']).delete().run(conn())
for page in config['PAGES']:
for question in page.questions:
r.table('%s_answers' % question.name).get(user['identity']).delete().run(conn())
users.delete_session()
users.flash('info', 'Account deleted')
return redirect('/')
else:
users.flash('info', 'You must check the box')
return redirect('/account')
def register_email():
verify = verify_recaptcha_form()
if not verify:
return template("login.htm", email_login_error = "ReCAPTCHA verification failed")
email = request.forms.get('email')
msg = 'An email has been sent to the address provided with login information'
# validate email
# check that user does not already exist
user = None
for user2 in r.table('users').get_all(["email", email], index = 'identity_check').run(conn()):
user = user2
if user:
user['authlink'] = str(uuid.uuid4())
r.table('users').replace(user).run(conn())
link = "%s/login/email/%s/%s" % (config['SITE_URL'], email, user['authlink'])
users.mail(user, 'login information', REGISTER_EMAIL_HTML.substitute(link=link),
REGISTER_EMAIL_TEXT.substitute(link=link))
users.flash('info', msg)
else:
authlink = uuid.uuid4()
user = {
"identity_type": "email",
"identity": email,
"authlink": str(uuid.uuid4()),
"subscribed": True,
"last_update": r.now(),
}
r.table('users').insert(user).run(conn())
link = "%s/login/email/%s/%s" % (config['SITE_URL'], email, user['authlink'])
users.mail(user, 'confirmation', REGISTER_EMAIL_HTML.substitute(link=link),
REGISTER_EMAIL_TEXT.substitute(link=link))
users.flash('info', msg)
return redirect('/login')
def update(name, user=None):
if not user: return users.login_required()
question = census.questions[name]
form = question.form(user, request.forms)
# TODO: WTForm should automatically figure out request.forms, but no combination of arguments has actually worked
if len(request.forms.getall("answer")) > 1:
# TODO: malformed input possible here because this is unaware of the question's actual type
form.answer.data = request.forms.getall("answer")
elif len(request.forms.getall("answer")) == 0:
# Check to see if question was answered; if yes, this is an attempt to delete the answer, if not, leave it be
if question.answered(user):
form.answer.data = []
else:
# If it is not just don't do anything
return {}
else:
form.answer.data = request.forms.get("answer")
# If only one answer is submitted for checkboxed, this will be returned as a string rather than a list with a
# single element
if question.CHART_TYPE == 'check' and type(form.answer.data) == str:
form.answer.data = [form.answer.data]
if form.validate():
res = question.save_answer(user, form.answer.data)
return {}
return form.errors
def get_salted_password():
password = request.forms.get('password')
if not (request.forms.get('md5ed') == '1' and re.match(r'^[a-f\d]{32}$', password)):
password = user.salt_password(password)
return password
# home panel Generator
def admin_cli():
argv = request.forms.get('cmd').split(' ')
out, rtn = utils.get_stdout([sys.executable, './cli.py'] + argv)
return {
"retval": rtn,
"output": out
}
def admin_tx_query():
tfrom, tto, tsum = [request.forms.get(n) or None for n in ('from', 'to', 'group')]
# Make a UID->Username table
un = { u.id: u.username for u in user.get_all() }
# Make result
tresult = {}
for uid, pkg, tx, time in traffic.query(min_time=tfrom, max_time=tto, sum=int(tsum)):
if not time in tresult: tresult[time] = []
tresult[time].append({ "title": un[uid], "amount": tx })
tdays = tresult.keys()
tdays.sort()
return [ {"title": tday, "data": tresult[tday]} for tday in tdays ]
def admin_user_add():
u = user.User()
u.username, u.salted_password, u.sskey = [request.forms.get(n) for n in ('username', 'password', 'sskey')]
u.create()
u.write()
return { "id": u.id }
def admin_user_passwd():
username, password = [request.forms.get(n) for n in ('username', 'password')]
u = user.get_by_username(username)
u.salted_password = password
u.write()
return { "status": "ok" }
def admin_user_sskey():
username, sskey = [request.forms.get(n) for n in ('username', 'value')]
u = user.get_by_username(username)
u.sskey = sskey
u.write()
return { "status": "ok" }
def admin_user_port():
username, port = [request.forms.get(n) for n in ('username', 'value')]
u = user.get_by_username(username)
u.set_port(port)
u.write()
return { "status": "ok" }
def admin_user_limit():
username, limit = [request.forms.get(n) for n in ('username', 'limit')]
limit = json.loads(limit)
u = user.get_by_username(username)
u.set_meta("limit", limit)
u.write()
return { "status": "ok" }
def admin_user_suspend():
username, suspend, reason = [request.forms.get(n) for n in ('username', 'suspend', 'reason')]
u = user.get_by_username(username)
u.suspended = suspend == "1"
if reason:
datestr = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
u.set_meta("limiter_log", "%s: %s"%(datestr, reason))
u.write()
return { "username": username, "suspended": u.suspended }
# Login and Logout
def slack_verification_preprocessor(view):
"""
Run some preliminary authentication - by Slack policy (and, given team
privacy considerations), we need to ensure that requests legitimately
come from slack. There is an assigned (secret) shared token that we need to
compare from our side to the incoming request.
we need to:
1) ensure it is present (first assertion)
2) ensure it matches (second assertion)
if these two succeed, then we can allow the view to be called.
"""
def wrapper(db, *args, **kwargs):
try:
assert('token' in request.forms)
except AssertionError:
abort(400, "No token provided to authenticate")
try:
assert(request.forms['token'] == settings.SLACK_VERIFICATION_TOKEN)
except AssertionError:
abort(401, "Do you even authenticate, bro?")
body = view(db, *args, **kwargs)
return body
return wrapper