def login_required(perm=None):
def _dec(func):
def _view(*args, **kwargs):
s = request.environ.get('beaker.session')
if s.get("name", None) and s.get("authenticated", False):
if perm:
perms = parse_permissions(s)
if perm not in perms or not perms[perm]:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HTTPError(403, "Forbidden")
else:
return redirect("/nopermission")
return func(*args, **kwargs)
else:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HTTPError(403, "Forbidden")
else:
return redirect("/login")
return _view
return _dec
python类redirect()的实例源码
def chat_room():
"""
?????????
:return:
"""
# cookie??????request????
username = request.get_cookie("username")
# cookie???????????????????
if not username:
return redirect("/")
# ?????????????????????
talk_list = get_talk()
return template("chat_room", username=username, talk_list=talk_list)
def talk():
"""
????????????????????????
:return:
"""
# ????????????get????getunicode???
chat_data = request.POST.getunicode("chat")
# ????cookie????
username = request.get_cookie("username")
# ??????
talk_time = datetime.now()
# ????
save_talk(talk_time, username, chat_data)
return redirect("/chat_room")
def show_app(app):
# very similar to start_new_job() consider consolidating
user = root.authorized()
root.set_active(app)
# parameters for return template
if app not in root.myapps:
return template('error', err="app %s is not installed" % (app))
try:
params = {}
params.update(root.myapps[app].params)
params['cid'] = ''
params['app'] = app
params['user'] = user
params['apps'] = root.myapps
return template(os.path.join('apps', app), params)
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print traceback.print_exception(exc_type, exc_value, exc_traceback)
redirect('/app/'+app)
def app_save(appid):
root.authorized()
app = request.forms.app
lang = request.forms.language
info = request.forms.input_format
category = request.forms.category
preprocess = request.forms.preprocess
postprocess = request.forms.postprocess
assets = request.forms.assets
if assets == "None": assets = None
desc = request.forms.description
row = db(db.apps.id==appid).select().first()
row.update_record(language=lang, category=category, description=desc, input_format=info,
preprocess=preprocess, postprocess=postprocess, assets=assets)
db.commit()
redirect("/app/"+app)
# allow only admin or user to delete apps
def delete_app(appid):
user = root.authorized()
if user != 'admin':
return template('error', err="must be admin to edit app")
appname = request.forms.app
del_app_dir = request.forms.del_app_dir
try:
if user == 'admin':
# delete entry in DB
if del_app_dir == "on":
del_files = True
else:
del_files = False
root.myapps[appname].delete(appid, del_files)
else:
return template("error", err="must be admin")
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print traceback.print_exception(exc_type, exc_value, exc_traceback)
return template("error", err="failed to delete app... did the app load properly?")
redirect("/apps")
def view_app(app):
user = root.authorized()
if app: root.set_active(app)
else: redirect('/myapps')
if user != 'admin':
return template('error', err="must be admin to edit app")
cid = request.query.cid
result = db(apps.name==app).select().first()
params = {}
params['app'] = app
params['user'] = user
params['cid'] = cid
#if request.query.edit:
# return template('appedit', params, rows=result)
#else:
try:
return template('app', params, rows=result)
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print traceback.print_exception(exc_type, exc_value, exc_traceback)
return template('error', err="there was a problem showing the app template. Check traceback.")
def addapp():
user = root.authorized()
if user != 'admin':
return template('error', err="must be admin to add app")
appname = request.forms.appname
input_format = request.forms.input_format
# ask for app name
category = request.forms.category
language = request.forms.language
description = request.forms.description
command = request.forms.command
preprocess = request.forms.preprocess
postprocess = request.forms.postprocess
# put in db
a = apprw.App()
#print "user:",user
a.create(appname, description, category, language,
input_format, command, preprocess, postprocess)
# load_apps() needs to be called here in case a user wants to delete
# this app just after it has been created... it is called again after
# the user uploads a sample input file
root.load_apps()
redirect('/app/'+appname)
def delete_job(jid):
user = root.authorized()
app = request.forms.app
cid = request.forms.cid
state = jobs(jid).state
if re.search("/", cid):
return template("error", err="only possible to delete cases that you own")
if state != "R":
path = os.path.join(user_dir, user, app, cid)
if os.path.isdir(path): shutil.rmtree(path)
root.sched.stop(jid)
root.sched.qdel(jid)
else:
return template("error", err="cannot delete while job is still running")
redirect("/jobs")
def delete_f():
user = root.authorized()
app = request.forms.app
cid = request.forms.cid
selected_files = request.forms.selected_files
files = selected_files.rstrip(':').split(':')
for file in files:
path = os.path.join(user_dir, user, app, cid, file)
if cid is not None:
if os.path.isfile(path):
print "removing file:", path
os.remove(path)
elif os.path.isdir(path):
print "removing path:", path
shutil.rmtree(path)
else:
print "ERROR: not removing path:", path, "because cid missing"
redirect("/files?cid="+cid+"&app="+app)
def zip_case():
"""zip case on machine to prepare for download"""
user = root.authorized()
import zipfile
app = request.query.app
cid = request.query.cid
base_dir = os.path.join(user_dir, user, app)
path = os.path.join(base_dir, cid+".zip")
zf = zipfile.ZipFile(path, mode='w', compression=zipfile.ZIP_DEFLATED)
sim_dir = os.path.join(base_dir, cid)
for fn in os.listdir(sim_dir):
zf.write(os.path.join(sim_dir, fn))
zf.close()
return static_file(path, root="./")
# status = "case compressed"
# redirect(request.headers.get('Referer')+"&status="+status)
def login_post_handler():
try:
username = bottle.request.forms['username']
password = bottle.request.forms['password']
except KeyError:
bottle.abort(400, 'Invalid form.')
try:
user = model.get_user(username)
except KeyError:
bottle.redirect('/login?msg=fail')
if user['password_hash'] == model.PASSWORDLESS_HASH:
if not handler_util.is_admin():
bottle.redirect('/login?msg=fail')
else:
if not sha256_crypt.verify(password, user['password_hash']):
bottle.redirect('/login?msg=fail')
handler_util.set_current_username(username)
bottle.redirect('/')
def login():
"""Authenticate users"""
username = post_get('username')
password = post_get('password')
if not aaa.login(username, password):
return dict(ok=False, msg="Username or password invalid")
if aaa.current_user.role == 'admin':
return dict(
ok=True,
redirect="/admin"
)
else:
return dict(
ok=True,
redirect="/experimenter"
)
def assertRedirect(self, target, result, query=None, status=303, **args):
env = {'SERVER_PROTOCOL':'HTTP/1.1'}
for key in list(args):
if key.startswith('wsgi'):
args[key.replace('_', '.', 1)] = args[key]
del args[key]
env.update(args)
request.bind(env)
bottle.response.bind()
try:
bottle.redirect(target, **(query or {}))
except bottle.HTTPResponse:
r = _e()
self.assertEqual(status, r.status_code)
self.assertTrue(r.headers)
self.assertEqual(result, r.headers['Location'])
def require_login(func):
def func_wrapper(*args, **kwargs):
global current_user, is_admin
uid_str = request.get_cookie('ssl_uid')
password = request.get_cookie('ssl_pw')
logined = uid_str and password
if logined:
current_user = user.get_by_id(int(uid_str))
logined = current_user and current_user.salted_password == password
is_admin = logined and current_user.id == config.USER_ADMIN
if not logined:
response.set_cookie('ssl_uid', '', expires=0)
response.set_cookie('ssl_pw', '', expires=0)
return redirect('/login')
return func(*args, **kwargs)
return func_wrapper
# decorator that used for user json APIs. Decorated function shall return a dict.
def do_login():
username = request.forms.get('username')
password = get_salted_password()
current_user = user.get_by_username(username)
logined = current_user and current_user.salted_password == password
if logined:
response.set_cookie('ssl_uid', str(current_user.id))
response.set_cookie('ssl_pw', password)
return redirect('/')
return template('login',
username=username,
message='User not found.' if not current_user else 'Password is incorrect.',
salt=config.USER_SALT
)
def show_post(permalink="notfound"):
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie)
permalink = cgi.escape(permalink)
print "about to query on permalink = ", permalink
post = posts.get_post_by_permalink(permalink)
if post is None:
bottle.redirect("/post_not_found")
# init comment form fields for additional comment
comment = {'name': "", 'body': "", 'email': ""}
return bottle.template("entry_template", dict(post=post, username=username, errors="", comment=comment))
# used to process a comment on a blog post
def process_signup():
email = bottle.request.forms.get("email")
username = bottle.request.forms.get("username")
password = bottle.request.forms.get("password")
verify = bottle.request.forms.get("verify")
# set these up in case we have an error case
errors = {'username': cgi.escape(username), 'email': cgi.escape(email)}
if validate_signup(username, password, verify, email, errors):
if not users.add_user(username, password, email):
# this was a duplicate
errors['username_error'] = "Username already in use. Please choose another"
return bottle.template("signup", errors)
session_id = sessions.start_session(username)
print session_id
bottle.response.set_cookie("session", session_id)
bottle.redirect("/welcome")
else:
print "user did not validate"
return bottle.template("signup", errors)
def present_welcome():
# check for a cookie, if present, then extract value
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie) # see if user is logged in
if username is None:
print "welcome: can't identify user...redirecting to signup"
bottle.redirect("/signup")
return bottle.template("welcome", {'username': username})
# Helper Functions
#extracts the tag from the tags form element. an experience python programmer could do this in fewer lines, no doubt
def show_post(permalink="notfound"):
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie)
permalink = cgi.escape(permalink)
print "about to query on permalink = ", permalink
post = posts.get_post_by_permalink(permalink)
if post is None:
bottle.redirect("/post_not_found")
# init comment form fields for additional comment
comment = {'name': "", 'body': "", 'email': ""}
return bottle.template("entry_template",
{"post": post, "username": username, "errors": "",
"comment": comment})
# used to process a comment on a blog post
def post_comment_like():
permalink = bottle.request.forms.get("permalink")
permalink = cgi.escape(permalink)
comment_ordinal_str = bottle.request.forms.get("comment_ordinal")
comment_ordinal = int(comment_ordinal_str)
post = posts.get_post_by_permalink(permalink)
if post is None:
bottle.redirect("/post_not_found")
return
# it all looks good. increment the ordinal
posts.increment_likes(permalink, comment_ordinal)
bottle.redirect("/post/" + permalink)
def process_signup():
email = bottle.request.forms.get("email")
username = bottle.request.forms.get("username")
password = bottle.request.forms.get("password")
verify = bottle.request.forms.get("verify")
# set these up in case we have an error case
errors = {'username': cgi.escape(username), 'email': cgi.escape(email)}
if validate_signup(username, password, verify, email, errors):
if not users.add_user(username, password, email):
# this was a duplicate
errors['username_error'] = ("Username already in use. " +
"Please choose another")
return bottle.template("signup", errors)
session_id = sessions.start_session(username)
print session_id
bottle.response.set_cookie("session", session_id)
bottle.redirect("/welcome")
else:
print "user did not validate"
return bottle.template("signup", errors)
def present_welcome():
# check for a cookie, if present, then extract value
cookie = bottle.request.get_cookie("session")
username = sessions.get_username(cookie) # see if user is logged in
if username is None:
print "welcome: can't identify user...redirecting to signup"
bottle.redirect("/signup")
return bottle.template("welcome", {'username': username})
# Helper Functions
# extracts the tag from the tags form element. An experienced python
# programmer could do this in fewer lines, no doubt
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
NOTE: your server name must be correctly configured in order for this to
work, do this by adding the headers at your http layer, in particular:
X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct
url and links for you.
"""
oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'],
state=request.GET['state'])
#PII - update privacy policy if oauth2 token is stored.
token = oauth2session.fetch_token(
settings.SLACK_OAUTH['token_url'],
client_secret=settings.SLACK_OAUTH['client_secret'],
authorization_response=request.url
)
# we don't need the token, we just need the user to have installed the app
# in the future, if we need tokens, we'll get them.
redirect('/?added_to_slack=true')
def welcome():
if request.params.get('req_thing') == "version_info":
return version_info()
username = request.get_cookie('username')
password = request.get_cookie('password')
password = urllib.unquote(password) if password else None
if verify_login(username, password):
redirect('/memorize')
else:
return dict(universal_ROUTE_dict)
def mmrz():
username = request.get_cookie('username')
password = request.get_cookie('password')
password = urllib.unquote(password) if password else None
if not verify_login(username, password):
redirect('/')
# need_https = "localhost" not in request.url
need_https = False
return_dict = dict(universal_ROUTE_dict)
return_dict.update(dict(need_https=need_https))
return return_dict
def setting():
username = request.get_cookie('username')
password = request.get_cookie('password')
password = urllib.unquote(password) if password else None
if not verify_login(username, password):
redirect('/')
dbMgr = MmrzSyncDBManager("USERS")
users = dbMgr.read_USERS_DB_DICT()
dbMgr.closeDB()
return_dict = dict(universal_ROUTE_dict)
return_dict.update({"username": username, "mailAddr": "" if users[username]["mailAddr"] == None else users[username]["mailAddr"]})
return return_dict
def dict_short():
query = request.urlparts.query
url = '/dictionary?{0}'.format(query) if query else '/dictionary'
redirect(url)
def show_favoritebook():
username = request.params.get('username', None)
# username not available means username exist, connect it
if not is_username_available(username):
dbMgr = MmrzSyncDBManager(username)
rows = dbMgr.read_FAVOURITE_DB()
dbMgr.closeDB()
# else user name not exist, redirect to /
else:
redirect('/')
rows_for_return = []
for row in rows:
row = list(row)
wordID = row[0]
word = row[1]
pronounce = row[2]
favourite = row[3]
rows_for_return.append(row)
return_dict = dict(universal_ROUTE_dict)
return_dict.update(dict(rows=rows_for_return))
return return_dict
def static(file):
return redirect("/static/" + file)