def index():
code = request.args.get("code", "")
app.logger.debug("code: %s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data.get('access_token')
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
python类url_for()的实例源码
def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
def test_inject_blueprint_url_defaults(self):
app = flask.Flask(__name__)
bp = flask.Blueprint('foo.bar.baz', __name__,
template_folder='template')
@bp.url_defaults
def bp_defaults(endpoint, values):
values['page'] = 'login'
@bp.route('/<page>')
def view(page): pass
app.register_blueprint(bp)
values = dict()
app.inject_url_defaults('foo.bar.baz.view', values)
expected = dict(page='login')
self.assert_equal(values, expected)
with app.test_request_context('/somepage'):
url = flask.url_for('foo.bar.baz.view')
expected = '/login'
self.assert_equal(url, expected)
def get_chapter(book_id):
# chapters = Chapter.query.filter_by(book_id=book_id).all()
page = request.args.get('page', 1, type=int)
pagination = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_chapter', book_id=book_id, page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_chapter', book_id=book_id, page=page+1, _external=True)
return jsonify({
'chapters': [chapter.to_json() for chapter in chapters],
'prev': prev,
'next': next
})
def get(self):
if request.cookies.get('save_id'):
resp = make_response(redirect(url_for('.exit')))
resp.set_cookie('user_name', expires=0)
resp.set_cookie('login_time', expires=0)
resp.set_cookie('save_id', expires=0)
return resp
if session.get('name'):
session.pop('name')
if session.get('show_name'):
session.pop('show_name')
if session.get('user_id'):
session.pop('user_id')
return redirect(url_for('.login'))
# ?config.json ???? is_register ?false??????? ??????????????
def get_zip(self, project, ty):
"""Get a ZIP file directly from uploaded directory
or generate one on the fly and upload it if not existing."""
filename = self.download_name(project, ty)
if not self.zip_existing(project, ty):
print "Warning: Generating %s on the fly now!" % filename
self._make_zip(project, ty)
if isinstance(uploader, local.LocalUploader):
filepath = self._download_path(project)
res = send_file(filename_or_fp=safe_join(filepath, filename),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=filename)
# fail safe mode for more encoded filenames.
# It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
# res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename
return res
else:
return redirect(url_for('rackspace', filename=filename,
container=self._container(project),
_external=True))
def build_sitemap():
from redberry.models import RedPost, RedCategory
from apesmit import Sitemap
sm = Sitemap(changefreq='weekly')
for post in RedPost.all_published():
sm.add(url_for('redberry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date())
for category in RedCategory.query.all():
sm.add(url_for('redberry.show_category', category_slug=category.slug, _external=True), lastmod=category.updated_at.date())
with open(os.path.join(REDBERRY_ROOT, 'static', 'redberry', 'sitemap.xml'), 'w') as f:
sm.write(f)
flash("Sitemap created.", 'success')
return redirect(url_for('redberry.home'))
##############
# ADMIN ROUTES
##############
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
session['known'] = False
if app.config['FLASKY_ADMIN']:
send_email(app.config['FLASKY_ADMIN'], 'New User',
'mail/new_user', user=user)
else:
session['known'] = True
session['name'] = form.name.data
from.name.data = ''
return redirect(url_for('index'))
return render_template('index.html', form=form, name=session.get('name'),
known=session.get('known', False))
def login():
""" This login function checks if the username & password
match the admin.db; if the authentication is successful,
it passes the id of the user into login_user() """
if request.method == "POST" and \
"username" in request.form and \
"password" in request.form:
username = request.form["username"]
password = request.form["password"]
user = User.get(username)
# If we found a user based on username then compare that the submitted
# password matches the password in the database. The password is stored
# is a slated hash format, so you must hash the password before comparing it.
if user and hash_pass(password) == user.password:
login_user(user, remember=True)
# FIXME! Get this to work properly...
# return redirect(request.args.get("next") or url_for("index"))
return redirect(url_for("index"))
else:
flash(u"Invalid username, please try again.")
return render_template("login.html")
def changepass():
if request.method == 'POST':
# process password change
if request.form['pass1'] == request.form['pass2']:
change_password(session['username'], request.form['pass1'])
log_action(session['uid'], 8)
session.pop('logged_in', None)
session.pop('uid', None)
session.pop('priv', None)
session.pop('username', None)
flash('Your password has been changed. Please login using your new password.')
return redirect(url_for('home'))
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('changepass.html')
return render_template('changepass.html')
#
# EDIT USER PAGE
#
def save_config(self):
if not self.is_authenticated():
return redirect(url_for('login'))
if (config['CONFIG_PATH'] is not None and
os.path.isfile(config['CONFIG_PATH'])):
config_path = config['CONFIG_PATH']
else:
config_path = os.path.join(config['ROOT_PATH'], 'config.json')
with open(config_path, 'w') as f:
data = {'GOOGLEMAPS_KEY': config['GOOGLEMAPS_KEY'],
'LOCALE': config['LOCALE'],
'CONFIG_PASSWORD': config['CONFIG_PASSWORD'],
'SCAN_LOCATIONS': self.scan_config.SCAN_LOCATIONS.values(),
'ACCOUNTS': config['ACCOUNTS']}
f.write(json.dumps(data))
def article():
site_info = site_get()
article_id = request.args.get('article_id',0)
if article_id != 0:
article = Article.query.filter_by(id = article_id).first()
if article is not None:
article = article.__dict__
article_id = article['id']
title = article['title']
packet_id = article['packet_id']
show = article['show']
timestamp = article['timestamp']
body = article['body'][:-1]
else:
return redirect(url_for('main.index'))
return render_template('article.html', **locals())
def test_url_with_method(self):
from flask.views import MethodView
app = flask.Flask(__name__)
class MyView(MethodView):
def get(self, id=None):
if id is None:
return 'List'
return 'Get %d' % id
def post(self):
return 'Create'
myview = MyView.as_view('myview')
app.add_url_rule('/myview/', methods=['GET'],
view_func=myview)
app.add_url_rule('/myview/<int:id>', methods=['GET'],
view_func=myview)
app.add_url_rule('/myview/create', methods=['POST'],
view_func=myview)
with app.test_request_context():
self.assert_equal(flask.url_for('myview', _method='GET'),
'/myview/')
self.assert_equal(flask.url_for('myview', id=42, _method='GET'),
'/myview/42')
self.assert_equal(flask.url_for('myview', _method='POST'),
'/myview/create')
def test_dotted_names(self):
frontend = flask.Blueprint('myapp.frontend', __name__)
backend = flask.Blueprint('myapp.backend', __name__)
@frontend.route('/fe')
def frontend_index():
return flask.url_for('myapp.backend.backend_index')
@frontend.route('/fe2')
def frontend_page2():
return flask.url_for('.frontend_index')
@backend.route('/be')
def backend_index():
return flask.url_for('myapp.frontend.frontend_index')
app = flask.Flask(__name__)
app.register_blueprint(frontend)
app.register_blueprint(backend)
c = app.test_client()
self.assert_equal(c.get('/fe').data.strip(), b'/be')
self.assert_equal(c.get('/fe2').data.strip(), b'/fe')
self.assert_equal(c.get('/be').data.strip(), b'/fe')
def test_build_error_handler(self):
app = flask.Flask(__name__)
# Test base case, a URL which results in a BuildError.
with app.test_request_context():
self.assertRaises(BuildError, flask.url_for, 'spam')
# Verify the error is re-raised if not the current exception.
try:
with app.test_request_context():
flask.url_for('spam')
except BuildError as err:
error = err
try:
raise RuntimeError('Test case where BuildError is not current.')
except RuntimeError:
self.assertRaises(BuildError, app.handle_url_build_error, error, 'spam', {})
# Test a custom handler.
def handler(error, endpoint, values):
# Just a test.
return '/test_handler/'
app.url_build_error_handlers.append(handler)
with app.test_request_context():
self.assert_equal(flask.url_for('spam'), '/test_handler/')
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], 'http://localhost/test')
rv = c.get('/test')
self.assertEqual(rv.data, b'42')
def signup():
from forms import SignupForm
form = SignupForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data.lower()).first()
if user is not None:
form.email.errors.append("The Email address is already taken.")
return render_template('signup.html', form=form)
newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data)
db.session.add(newuser)
db.session.commit()
session['email'] = newuser.email
return redirect(url_for('login'))
return render_template('signup.html', form=form)
def login():
if g.user is not None and g.user.is_authenticated:
return redirect(url_for('index'))
from app.forms import LoginForm
form = LoginForm()
if form.validate_on_submit():
session['remember_me'] = form.remember_me.data
user = User.query.filter_by(email=form.email.data.lower()).first()
if user and user.check_password(form.password.data):
session['email'] = form.email.data
login_user(user,remember=session['remember_me'])
return redirect(url_for('index'))
else:
return render_template('login.html',form=form,failed_auth=True)
return render_template('login.html',form=form)
def _lookup_url(self, endpoint, values):
"""Return Rackspace URL for object."""
try:
# Create failover urls for avatars
if '_avatar' in values['filename']:
failover_url = url_for('static',
filename='img/placeholder.user.png')
else:
failover_url = url_for('static',
filename='img/placeholder.project.png')
cont = self.get_container(values['container'])
if cont.cdn_enabled:
return "%s/%s" % (cont.cdn_ssl_uri, values['filename'])
else:
msg = ("Rackspace Container %s was not public"
% values['container'])
current_app.logger.warning(msg)
cont.make_public()
return "%s/%s" % (cont.cdn_ssl_uri, values['filename'])
except:
current_app.logger.error(traceback.print_exc())
return failover_url
def confirm_email():
"""Send email to confirm user email."""
acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_DISABLED')
if acc_conf_dis:
return abort(404)
if current_user.valid_email is False:
user = user_repo.get(current_user.id)
account = dict(fullname=current_user.fullname, name=current_user.name,
email_addr=current_user.email_addr)
confirm_url = get_email_confirmation_url(account)
subject = ('Verify your email in %s' % current_app.config.get('BRAND'))
msg = dict(subject=subject,
recipients=[current_user.email_addr],
body=render_template('/account/email/validate_email.md',
user=account, confirm_url=confirm_url))
msg['html'] = render_template('/account/email/validate_email.html',
user=account, confirm_url=confirm_url)
mail_queue.enqueue(send_mail, msg)
msg = gettext("An e-mail has been sent to \
validate your e-mail address.")
flash(msg, 'info')
user.confirmation_email_sent = True
user_repo.update(user)
return redirect_content_type(url_for('.profile', name=current_user.name))
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
user = user_repo.get_by_name(user_data['screen_name'])
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
if ((user.email_addr != user.name) and user.newsletter_prompted is False
and newsletter.is_initialized()):
return redirect(url_for('account.newsletter_subscribe',
next=next_url))
if user.email_addr != user.name:
return redirect(next_url)
else:
flash("Please update your e-mail address in your profile page")
return redirect(url_for('account.update_profile', name=user.name))
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
# Give a hint for the user
user = user_repo.get_by(email_addr=user_data.get('email'))
if user is not None:
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
else:
return redirect(url_for('account.signin'))
else:
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
request_email = (user.email_addr == user.name)
if request_email:
flash("Please update your e-mail address in your profile page")
return redirect(url_for('account.update_profile', name=user.name))
if (not request_email and user.newsletter_prompted is False
and newsletter.is_initialized()):
return redirect(url_for('account.newsletter_subscribe', next=next_url))
return redirect(next_url)
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
# Give a hint for the user
user = user_repo.get_by(email_addr=user_data['email'])
if user is None:
name = username_from_full_name(user_data['name'])
user = user_repo.get_by_name(name)
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
else:
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
if user.newsletter_prompted is False and newsletter.is_initialized():
return redirect(url_for('account.newsletter_subscribe',
next=next_url))
return redirect(next_url)
def delete_autoimporter(short_name):
pro = pro_features()
if not pro['autoimporter_enabled']:
raise abort(403)
project = project_by_shortname(short_name)[0]
ensure_authorized_to('read', project)
ensure_authorized_to('update', project)
if project.has_autoimporter():
autoimporter = project.get_autoimporter()
project.delete_autoimporter()
project_repo.save(project)
auditlogger.log_event(project, current_user, 'delete', 'autoimporter',
json.dumps(autoimporter), 'Nothing')
return redirect(url_for('.tasks', short_name=project.short_name))
def publish(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
#### shruthi
if("sched" in project.info.keys() and project.info["sched"]=="FRG"):
if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)):
flash("You did not created quiz.Please create the quiz","danger")
return redirect(url_for('quiz.create_quiz', short_name=project.short_name))
#### end
pro = pro_features()
ensure_authorized_to('publish', project)
if request.method == 'GET':
return render_template('projects/publish.html',
project=project,
pro_features=pro)
project.published = True
project_repo.save(project)
task_repo.delete_taskruns_from_project(project)
result_repo.delete_results_from_project(project)
webhook_repo.delete_entries_from_project(project)
auditlogger.log_event(project, current_user, 'update', 'published', False, True)
flash(gettext('Project published! Volunteers will now be able to help you!'))
return redirect(url_for('.details', short_name=project.short_name))
def reset_secret_key(short_name):
"""
Reset Project key.
"""
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
title = project_title(project, "Results")
ensure_authorized_to('update', project)
project.secret_key = make_uuid()
project_repo.update(project)
cached_projects.delete_project(short_name)
msg = gettext('New secret key generated')
flash(msg, 'success')
return redirect_content_type(url_for('.update', short_name=short_name))
def add_admin(user_id=None):
"""Add admin flag for user_id."""
try:
if user_id:
user = user_repo.get(user_id)
if user:
ensure_authorized_to('update', user)
user.admin = True
user_repo.update(user)
return redirect_content_type(url_for(".users"))
else:
msg = "User not found"
return format_error(msg, 404)
except Exception as e: # pragma: no cover
current_app.logger.error(e)
return abort(500)
def create_post():
post_data = {
'title': request.form.get('title'),
'content': request.form.get('content'),
}
post = Post()
post.set(post_data)
post = markdown(post)
upload_image = request.files.get('featured_image')
if upload_image.filename != '' and allowed_file(upload_image.filename):
f = Attachment(upload_image.filename, data=upload_image.stream)
post.set('featured_image', f)
post.save()
tag_names = request.form.get('tags').lower().strip()
tags = [get_tag_by_name(x) for x in split_tag_names(tag_names)]
map_tags_to_post(tags, post)
return redirect(url_for('show_post', post_id=post.id))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))