def auth_jwt_project(short_name):
"""Create a JWT for a project via its secret KEY."""
project_secret_key = None
if 'Authorization' in request.headers:
project_secret_key = request.headers.get('Authorization')
if project_secret_key:
project = project_repo.get_by_shortname(short_name)
if project and project.secret_key == project_secret_key:
token = jwt.encode({'short_name': short_name,
'project_id': project.id},
project.secret_key, algorithm='HS256')
return token
else:
return abort(404)
else:
return abort(403)
python类abort()的实例源码
def init_login_manager(db):
"""Init security extensions (login manager and principal)
:param db: Database which stores user accounts and roles
:type db: ``flask_sqlalchemy.SQLAlchemy``
:return: Login manager and principal extensions
:rtype: (``flask_login.LoginManager``, ``flask_principal.Principal``
"""
login_manager = flask_login.LoginManager()
principals = flask_principal.Principal()
login_manager.anonymous_user = Anonymous
@login_manager.unauthorized_handler
def unauthorized():
flask.abort(403)
@login_manager.user_loader
def load_user(user_id):
return db.session.query(UserAccount).get(int(user_id))
return login_manager, principals
def lookup():
icao_identifier = request.form['airport']
if request.form['date']:
try:
date = dateparser.parse(request.form['date']).date()
except ValueError:
return "Unable to understand date %s" % request.form['date'], 400
else:
date = datetime.date.today()
try:
result = do_lookup(icao_identifier, date)
except Exception as e:
return str(e), 400
except:
flask.abort(500)
return json.dumps(result)
def star():
"""Starring/Highlighting handler.
Attempts to toggle a star/highlight on a particular show. The show ID must
be passed in the ``id`` query string. If the user is unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
JSON formatted output describing success and the ID of the show starred.
"""
log.debug("Entering star, trying to toggle star.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.star_show(request.args['id'])
log.debug("Returning to user.")
return jsonify({ "star": "success", "id": request.args['id'] })
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def drop_show():
"""Show removal handler.
Attempts to remove a show from the backend system. The show ID must
be passed in the ``id`` query string. If the user if unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
An HTTP redirect to the home page, to refresh.
"""
log.debug("Entering drop_show, trying to remove show from list.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.remove_show(request.args['id'])
log.debug("Refreshing user's page.")
return redirect('/')
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def scan_scrapers():
"""On demand scrapper scanning handler.
For some reason the scheduler doesn't always work, this endpoint allows for
instant scanning, assuming it's not already occurring. The function is aborted
with a ``404`` message to hide the page if the user is not authenticated.
Scanning can take a long time - 20 to 30 minutes - so it's recommended this
endpoint be called asynchronously.
Returns:
JSON formatted output to identify that scanning has completed or is already
ongoing.
"""
log.debug("Entering scan_scrapers.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("User is logged in, attempting to begin scan.")
if not fe.scrape_shows():
log.debug("scrape_shows returned false, either the lockfile exists incorrectly or scraping is ongoing.")
return jsonify({"scan":"failure", "reason":"A scan is ongoing"})
log.debug("scrape_shows just returned. Returning success.")
return jsonify({"scan":"success"})
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def test_error_handling(self):
app = flask.Flask(__name__)
admin = flask.Module(__name__, 'admin')
@admin.app_errorhandler(404)
def not_found(e):
return 'not found', 404
@admin.app_errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@admin.route('/')
def index():
flask.abort(404)
@admin.route('/error')
def error():
1 // 0
app.register_module(admin)
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'not found')
rv = c.get('/error')
self.assert_equal(rv.status_code, 500)
self.assert_equal(b'internal server error', rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], 'http://localhost/test')
rv = c.get('/test')
self.assertEqual(rv.data, b'42')
def index(page=1):
"""Index page for all PYBOSSA registered users."""
update_feed = get_update_feed()
per_page = 24
count = cached_users.get_total_users()
accounts = cached_users.get_users_page(page, per_page)
if not accounts and page != 1:
abort(404)
pagination = Pagination(page, per_page, count)
if current_user.is_authenticated():
user_id = current_user.id
else:
user_id = None
top_users = cached_users.get_leaderboard(current_app.config['LEADERBOARD'],
user_id)
tmp = dict(template='account/index.html', accounts=accounts,
total=count,
top_users=top_users,
title="Community", pagination=pagination,
update_feed=update_feed)
return handle_content_type(tmp)
def confirm_email():
"""Send email to confirm user email."""
acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_DISABLED')
if acc_conf_dis:
return abort(404)
if current_user.valid_email is False:
user = user_repo.get(current_user.id)
account = dict(fullname=current_user.fullname, name=current_user.name,
email_addr=current_user.email_addr)
confirm_url = get_email_confirmation_url(account)
subject = ('Verify your email in %s' % current_app.config.get('BRAND'))
msg = dict(subject=subject,
recipients=[current_user.email_addr],
body=render_template('/account/email/validate_email.md',
user=account, confirm_url=confirm_url))
msg['html'] = render_template('/account/email/validate_email.html',
user=account, confirm_url=confirm_url)
mail_queue.enqueue(send_mail, msg)
msg = gettext("An e-mail has been sent to \
validate your e-mail address.")
flash(msg, 'info')
user.confirmation_email_sent = True
user_repo.update(user)
return redirect_content_type(url_for('.profile', name=current_user.name))
def projects(name):
"""
List user's project list.
Returns a Jinja2 template with the list of projects of the user.
"""
user = user_repo.get_by_name(name)
if not user:
return abort(404)
if current_user.name != name:
return abort(403)
user = user_repo.get(current_user.id)
projects_published, projects_draft = _get_user_projects(user.id)
response = dict(template='account/projects.html',
title=gettext("Projects"),
projects_published=projects_published,
projects_draft=projects_draft)
return handle_content_type(response)
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def project_by_shortname(short_name):
project = cached_projects.get_project(short_name)
if project:
# Get owner
owner = user_repo.get(project.owner_id)
# Populate CACHE with the data of the project
return (project,
owner,
cached_projects.n_tasks(project.id),
cached_projects.n_task_runs(project.id),
cached_projects.overall_progress(project.id),
cached_projects.last_activity(project.id),
cached_projects.n_results(project.id))
else:
cached_projects.delete_project(short_name)
return abort(404)
def delete_autoimporter(short_name):
pro = pro_features()
if not pro['autoimporter_enabled']:
raise abort(403)
project = project_by_shortname(short_name)[0]
ensure_authorized_to('read', project)
ensure_authorized_to('update', project)
if project.has_autoimporter():
autoimporter = project.get_autoimporter()
project.delete_autoimporter()
project_repo.save(project)
auditlogger.log_event(project, current_user, 'delete', 'autoimporter',
json.dumps(autoimporter), 'Nothing')
return redirect(url_for('.tasks', short_name=project.short_name))
def auditlog(short_name):
pro = pro_features()
if not pro['auditlog_enabled']:
raise abort(403)
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
ensure_authorized_to('read', Auditlog, project_id=project.id)
logs = auditlogger.get_project_logs(project.id)
project = add_custom_contrib_button_to(project, get_user_id_or_ip())
return render_template('projects/auditlog.html', project=project,
owner=owner, logs=logs,
overall_progress=overall_progress,
n_tasks=n_tasks,
n_task_runs=n_task_runs,
n_completed_tasks=cached_projects.n_completed_tasks(project.get('id')),
n_volunteers=cached_projects.n_volunteers(project.get('id')),
pro_features=pro)
def add_admin(user_id=None):
"""Add admin flag for user_id."""
try:
if user_id:
user = user_repo.get(user_id)
if user:
ensure_authorized_to('update', user)
user.admin = True
user_repo.update(user)
return redirect_content_type(url_for(".users"))
else:
msg = "User not found"
return format_error(msg, 404)
except Exception as e: # pragma: no cover
current_app.logger.error(e)
return abort(500)
def del_admin(user_id=None):
"""Del admin flag for user_id."""
try:
if user_id:
user = user_repo.get(user_id)
if user:
ensure_authorized_to('update', user)
user.admin = False
user_repo.update(user)
return redirect_content_type(url_for('.users'))
else:
msg = "User.id not found"
return format_error(msg, 404)
else: # pragma: no cover
msg = "User.id is missing for method del_admin"
return format_error(msg, 415)
except Exception as e: # pragma: no cover
current_app.logger.error(e)
return abort(500)
def run_cmd_host(host_id):
"""Execute host specific command."""
#Select host on the server.
res = RSPET_API.select([host_id])
#Check if host selected correctly (if not probably host_id is invalid).
if res["code"] != rspet_server.ReturnCodes.OK:
abort(404)
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
res = RSPET_API.call_plugin(comm, args)
#Unselect host. Maintain statelessness of RESTful.
RSPET_API.select()
return jsonify(res)
def run_cmd():
"""Execute general (non-host specific) command."""
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
ret = RSPET_API.call_plugin(comm, args)
if ret['code'] == rspet_server.ReturnCodes.OK:
http_code = 200
else:
http_code = 404
return make_response(jsonify(ret), http_code)
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("schoolID", type=str, required=True, location="json")
parser.add_argument("userID", type=str, required=True, location="json")
args = parser.parse_args()
user = db.tempUser.find_one({"_id": args["userID"]})
if user is None: abort(400)
school = db.school.find_one({"_id": args["schoolID"]})
if school is None: abort(400)
user["schoolID"] = args["schoolID"]
db.user.insert_one(user)
session['userID'] = user["_id"]
return jsonify(user, status=201)
def test_error_handling(self):
app = flask.Flask(__name__)
admin = flask.Module(__name__, 'admin')
@admin.app_errorhandler(404)
def not_found(e):
return 'not found', 404
@admin.app_errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@admin.route('/')
def index():
flask.abort(404)
@admin.route('/error')
def error():
1 // 0
app.register_module(admin)
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'not found')
rv = c.get('/error')
self.assert_equal(rv.status_code, 500)
self.assert_equal(b'internal server error', rv.data)
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', 404
@app.errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@app.route('/')
def index():
flask.abort(404)
@app.route('/error')
def error():
1 // 0
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'not found')
rv = c.get('/error')
self.assert_equal(rv.status_code, 500)
self.assert_equal(b'internal server error', rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], 'http://localhost/test')
rv = c.get('/test')
self.assertEqual(rv.data, b'42')
def post_recipe():
payload = request.get_json()
topology = payload.get("topology")
scenarios = payload.get("scenarios")
headers = payload.get("headers")
#pattern = payload.get("header_pattern")
if not topology:
abort(400, "Topology required")
if not scenarios:
abort(400, "Failure scenarios required")
if headers and type(headers)!=dict:
abort(400, "Headers must be a dictionary")
# if not pattern:
# abort(400, "Header_pattern required")
appgraph = ApplicationGraph(topology)
fg = A8FailureGenerator(appgraph, a8_controller_url='{0}/v1/rules'.format(a8_controller_url), a8_controller_token=a8_controller_token, headers=headers, debug=debug)
fg.setup_failures(scenarios)
return make_response(jsonify(recipe_id=fg.get_id()), 201, {'location': url_for('get_recipe_results', recipe_id=fg.get_id())})
def delete_recipe(recipe_id):
# clear fault injection rules
url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id)
headers = {}
if a8_controller_token != "" :
headers['Authorization'] = "Bearer " + token
try:
r = requests.delete(url, headers=headers)
except Exception, e:
sys.stderr.write("Could not DELETE {0}".format(url))
sys.stderr.write("\n")
sys.stderr.write(str(e))
sys.stderr.write("\n")
abort(500, "Could not DELETE {0}".format(url))
if r.status_code != 200 and r.status_code != 204:
abort(r.status_code)
return ""
def serve_static_files(p, index_on_error=True):
"""Securely serve static files for the given path using send_file."""
# Determine the canonical path of the file
full_path = os.path.realpath(os.path.join(app.static_folder, p))
# We have a problem if either:
# - the path is not a sub-path of app.static_folder; or
# - the path does not refer to a real file.
if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or
not os.path.isfile(full_path)):
file_to_return = app.config.get('STATIC_FILE_ON_404', None)
if file_to_return is not None:
full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return))
else:
return abort(404)
return send_file(full_path)
def post(self):
if current_user() is None:
redirect(url_for('authorized'))
user = current_user()
if not user.faculty:
return abort(403)
key = request.form.get('key', None)
value = request.form.get('value', None)
try:
try:
config.set(key, config.from_frontend_value(key, json.loads(value)))
return jsonify({'status': 'OK'})
except ValueError:
return abort(404)
except:
return abort(400)
def configure_views(app):
@app.route('/')
def home():
return render_template('ok.html')
@app.route('/mistake')
def mistake():
# this causes Internal Server Error
return abort(500)
@app.route('/secret')
def secret():
# this causes Foridden (user not authorized)
return abort(403)
def test_error_handling(self):
app = flask.Flask(__name__)
admin = flask.Module(__name__, 'admin')
@admin.app_errorhandler(404)
def not_found(e):
return 'not found', 404
@admin.app_errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@admin.route('/')
def index():
flask.abort(404)
@admin.route('/error')
def error():
1 // 0
app.register_module(admin)
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'not found')
rv = c.get('/error')
self.assert_equal(rv.status_code, 500)
self.assert_equal(b'internal server error', rv.data)
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', 404
@app.errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@app.route('/')
def index():
flask.abort(404)
@app.route('/error')
def error():
1 // 0
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'not found')
rv = c.get('/error')
self.assert_equal(rv.status_code, 500)
self.assert_equal(b'internal server error', rv.data)