def index():
code = request.args.get("code", "")
app.logger.debug("code: %s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data.get('access_token')
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
python类redirect()的实例源码
def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
def webpage():
url = request.args.get('url')
if not url:
# redirect with url query param so that user can navigate back later
next_rec = service.get_next_unlabelled()
if next_rec:
return redirect("/?url=%s" % (urllib.quote(next_rec['url'])))
else:
featured_content = "No Unlabelled Record Found."
else:
featured_content = get_next(url)
data = {
'featured_content': featured_content,
'status': service.overall_status()
}
return render_template('index.html', **data)
def edit_view(self, pk):
"""edit view function
:param pk:
the primary key of the model to be edited.
"""
obj = self.query_object(pk)
form = self.edit_form_class(obj=obj)
if form.validate_on_submit():
form.populate_obj(obj)
obj.save()
message = self.edit_flash_message
if message is None:
message = self.object_name + ' updated'
if message:
flash(message)
return redirect(self.edit_redirect_url)
context = self.edit_view_context({self.edit_form_name: form})
return render_template(self.edit_template, **context)
def delete_view(self, pk):
"""delete view function
:param pk:
the primary key of the model to be deleted.
"""
obj = self.query_object(pk)
form = self.delete_form_class(obj=obj)
if form.validate_on_submit():
obj.delete()
message = self.delete_flash_message
if message is None:
message = self.object_name + ' deleted'
if message:
flash(message)
return redirect(self.delete_redirect_url)
context = self.delete_view_context({self.delete_form_name: form})
return render_template(self.delete_template, **context)
def get(self):
if request.cookies.get('save_id'):
resp = make_response(redirect(url_for('.exit')))
resp.set_cookie('user_name', expires=0)
resp.set_cookie('login_time', expires=0)
resp.set_cookie('save_id', expires=0)
return resp
if session.get('name'):
session.pop('name')
if session.get('show_name'):
session.pop('show_name')
if session.get('user_id'):
session.pop('user_id')
return redirect(url_for('.login'))
# ?config.json ???? is_register ?false??????? ??????????????
def get_zip(self, project, ty):
"""Get a ZIP file directly from uploaded directory
or generate one on the fly and upload it if not existing."""
filename = self.download_name(project, ty)
if not self.zip_existing(project, ty):
print "Warning: Generating %s on the fly now!" % filename
self._make_zip(project, ty)
if isinstance(uploader, local.LocalUploader):
filepath = self._download_path(project)
res = send_file(filename_or_fp=safe_join(filepath, filename),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=filename)
# fail safe mode for more encoded filenames.
# It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
# res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename
return res
else:
return redirect(url_for('rackspace', filename=filename,
container=self._container(project),
_external=True))
def build_sitemap():
from redberry.models import RedPost, RedCategory
from apesmit import Sitemap
sm = Sitemap(changefreq='weekly')
for post in RedPost.all_published():
sm.add(url_for('redberry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date())
for category in RedCategory.query.all():
sm.add(url_for('redberry.show_category', category_slug=category.slug, _external=True), lastmod=category.updated_at.date())
with open(os.path.join(REDBERRY_ROOT, 'static', 'redberry', 'sitemap.xml'), 'w') as f:
sm.write(f)
flash("Sitemap created.", 'success')
return redirect(url_for('redberry.home'))
##############
# ADMIN ROUTES
##############
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
session['known'] = False
if app.config['FLASKY_ADMIN']:
send_email(app.config['FLASKY_ADMIN'], 'New User',
'mail/new_user', user=user)
else:
session['known'] = True
session['name'] = form.name.data
from.name.data = ''
return redirect(url_for('index'))
return render_template('index.html', form=form, name=session.get('name'),
known=session.get('known', False))
def post(self):
if (request.form['username']):
data = {"user": request.form['username'], "key": request.form['password']}
result = dockletRequest.unauthorizedpost('/login/', data)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key))
# set session for docklet
session['username'] = request.form['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
else:
return redirect('/login/')
def get(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == "true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
masterip = self.masterip
index1 = self.image.rindex("_")
index2 = self.image[:index1].rindex("_")
checkname(self.clustername)
data = {
"clustername": self.clustername,
'imagename': self.image[:index2],
'imageowner': self.image[index2+1:index1],
'imagetype': self.image[index1+1:],
}
result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip)
if(result.get('success', None) == "true"):
return redirect("/dashboard/")
#return self.render(self.template_path, user = session['username'])
else:
return self.render(self.error_path, message = result.get('message'))
def post(self):
masterip = self.masterip
data = {
"clustername": self.clustername,
"image": self.imagename,
"containername": self.containername,
"description": self.description,
"isforce": self.isforce
}
result = dockletRequest.post("/cluster/save/", data, masterip)
if(result):
if result.get('success') == 'true':
#return self.render(self.success_path, user = session['username'])
return redirect("/config/")
#res = detailClusterView()
#res.clustername = self.clustername
#return res.as_view()
else:
if result.get('reason') == "exists":
return self.render(self.template_path, containername = self.containername, clustername = self.clustername, image = self.imagename, user = session['username'], description = self.description, masterip=masterip)
else:
return self.render(self.error_path, message = result.get('message'))
else:
self.error()
def internal_server_error(error):
logger.error(error)
logger.error(traceback.format_exc())
if "username" in session:
if "500" in session and "500_title" in session:
reason = session['500']
title = session['500_title']
session.pop('500', None)
session.pop('500_title', None)
else:
reason = '''The server encountered something unexpected that didn't allow it to complete the request. We apologize.You can go back to
<a href="/dashboard/">dashboard</a> or <a href="/logout">log out</a>'''
title = 'Internal Server Error'
return render_template('error/500.html', mysession = session, reason = reason, title = title)
else:
return redirect('/login/')
def login():
""" This login function checks if the username & password
match the admin.db; if the authentication is successful,
it passes the id of the user into login_user() """
if request.method == "POST" and \
"username" in request.form and \
"password" in request.form:
username = request.form["username"]
password = request.form["password"]
user = User.get(username)
# If we found a user based on username then compare that the submitted
# password matches the password in the database. The password is stored
# is a slated hash format, so you must hash the password before comparing it.
if user and hash_pass(password) == user.password:
login_user(user, remember=True)
# FIXME! Get this to work properly...
# return redirect(request.args.get("next") or url_for("index"))
return redirect(url_for("index"))
else:
flash(u"Invalid username, please try again.")
return render_template("login.html")
def changepass():
if request.method == 'POST':
# process password change
if request.form['pass1'] == request.form['pass2']:
change_password(session['username'], request.form['pass1'])
log_action(session['uid'], 8)
session.pop('logged_in', None)
session.pop('uid', None)
session.pop('priv', None)
session.pop('username', None)
flash('Your password has been changed. Please login using your new password.')
return redirect(url_for('home'))
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('changepass.html')
return render_template('changepass.html')
#
# EDIT USER PAGE
#
def save_config(self):
if not self.is_authenticated():
return redirect(url_for('login'))
if (config['CONFIG_PATH'] is not None and
os.path.isfile(config['CONFIG_PATH'])):
config_path = config['CONFIG_PATH']
else:
config_path = os.path.join(config['ROOT_PATH'], 'config.json')
with open(config_path, 'w') as f:
data = {'GOOGLEMAPS_KEY': config['GOOGLEMAPS_KEY'],
'LOCALE': config['LOCALE'],
'CONFIG_PASSWORD': config['CONFIG_PASSWORD'],
'SCAN_LOCATIONS': self.scan_config.SCAN_LOCATIONS.values(),
'ACCOUNTS': config['ACCOUNTS']}
f.write(json.dumps(data))
def drop_show():
"""Show removal handler.
Attempts to remove a show from the backend system. The show ID must
be passed in the ``id`` query string. If the user if unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
An HTTP redirect to the home page, to refresh.
"""
log.debug("Entering drop_show, trying to remove show from list.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.remove_show(request.args['id'])
log.debug("Refreshing user's page.")
return redirect('/')
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def authorize_view(self):
"""Flask view that starts the authorization flow.
Starts flow by redirecting the user to the OAuth2 provider.
"""
args = request.args.to_dict()
# Scopes will be passed as mutliple args, and to_dict() will only
# return one. So, we use getlist() to get all of the scopes.
args['scopes'] = request.args.getlist('scopes')
return_url = args.pop('return_url', None)
if return_url is None:
return_url = request.referrer or '/'
flow = self._make_flow(return_url=return_url, **args)
auth_url = flow.step1_get_authorize_url()
return redirect(auth_url)
def article():
site_info = site_get()
article_id = request.args.get('article_id',0)
if article_id != 0:
article = Article.query.filter_by(id = article_id).first()
if article is not None:
article = article.__dict__
article_id = article['id']
title = article['title']
packet_id = article['packet_id']
show = article['show']
timestamp = article['timestamp']
body = article['body'][:-1]
else:
return redirect(url_for('main.index'))
return render_template('article.html', **locals())
def test_flash_signal(self):
app = flask.Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
@app.route('/')
def index():
flask.flash('This is a flash message', category='notice')
return flask.redirect('/other')
recorded = []
def record(sender, message, category):
recorded.append((message, category))
flask.message_flashed.connect(record, app)
try:
client = app.test_client()
with client.session_transaction():
client.get('/')
self.assert_equal(len(recorded), 1)
message, category = recorded[0]
self.assert_equal(message, 'This is a flash message')
self.assert_equal(category, 'notice')
finally:
flask.message_flashed.disconnect(record, app)
admin_tools_plugin.py 文件源码
项目:airflow-admin-tools-plugin
作者: rssanders3
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def email(self):
try:
to_email = request.args.get('to_email')
path, attr = configuration.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
logging.info("path: " + str(path))
logging.info("attr: " + str(attr))
module = importlib.import_module(path)
logging.info("module: " + str(module))
backend = getattr(module, attr)
backend(to_email, "Test Email", "Test Email", files=None, dryrun=False)
flash('Email Sent')
except Exception as e:
flash('Failed to Send Email: ' + str(e), 'error')
return redirect("/admin/admintools", code=302)
def signup():
from forms import SignupForm
form = SignupForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data.lower()).first()
if user is not None:
form.email.errors.append("The Email address is already taken.")
return render_template('signup.html', form=form)
newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data)
db.session.add(newuser)
db.session.commit()
session['email'] = newuser.email
return redirect(url_for('login'))
return render_template('signup.html', form=form)
def login():
if g.user is not None and g.user.is_authenticated:
return redirect(url_for('index'))
from app.forms import LoginForm
form = LoginForm()
if form.validate_on_submit():
session['remember_me'] = form.remember_me.data
user = User.query.filter_by(email=form.email.data.lower()).first()
if user and user.check_password(form.password.data):
session['email'] = form.email.data
login_user(user,remember=session['remember_me'])
return redirect(url_for('index'))
else:
return render_template('login.html',form=form,failed_auth=True)
return render_template('login.html',form=form)
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
user = user_repo.get_by_name(user_data['screen_name'])
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
if ((user.email_addr != user.name) and user.newsletter_prompted is False
and newsletter.is_initialized()):
return redirect(url_for('account.newsletter_subscribe',
next=next_url))
if user.email_addr != user.name:
return redirect(next_url)
else:
flash("Please update your e-mail address in your profile page")
return redirect(url_for('account.update_profile', name=user.name))
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
# Give a hint for the user
user = user_repo.get_by(email_addr=user_data.get('email'))
if user is not None:
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
else:
return redirect(url_for('account.signin'))
else:
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
request_email = (user.email_addr == user.name)
if request_email:
flash("Please update your e-mail address in your profile page")
return redirect(url_for('account.update_profile', name=user.name))
if (not request_email and user.newsletter_prompted is False
and newsletter.is_initialized()):
return redirect(url_for('account.newsletter_subscribe', next=next_url))
return redirect(next_url)
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
# Give a hint for the user
user = user_repo.get_by(email_addr=user_data['email'])
if user is None:
name = username_from_full_name(user_data['name'])
user = user_repo.get_by_name(name)
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
else:
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
if user.newsletter_prompted is False and newsletter.is_initialized():
return redirect(url_for('account.newsletter_subscribe',
next=next_url))
return redirect(next_url)
def password_required(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
form = PasswordForm(request.form)
if request.method == 'POST' and form.validate():
password = request.form.get('password')
cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT')
passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp))
if passwd_mngr.validates(password, project):
response = make_response(redirect(request.args.get('next')))
return passwd_mngr.update_response(response, project, get_user_id_or_ip())
flash(gettext('Sorry, incorrect password'))
return render_template('projects/password.html',
project=project,
form=form,
short_name=short_name,
next=request.args.get('next'))
def publish(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
#### shruthi
if("sched" in project.info.keys() and project.info["sched"]=="FRG"):
if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)):
flash("You did not created quiz.Please create the quiz","danger")
return redirect(url_for('quiz.create_quiz', short_name=project.short_name))
#### end
pro = pro_features()
ensure_authorized_to('publish', project)
if request.method == 'GET':
return render_template('projects/publish.html',
project=project,
pro_features=pro)
project.published = True
project_repo.save(project)
task_repo.delete_taskruns_from_project(project)
result_repo.delete_results_from_project(project)
webhook_repo.delete_entries_from_project(project)
auditlogger.log_event(project, current_user, 'update', 'published', False, True)
flash(gettext('Project published! Volunteers will now be able to help you!'))
return redirect(url_for('.details', short_name=project.short_name))