def login_post_handler():
try:
username = bottle.request.forms['username']
password = bottle.request.forms['password']
except KeyError:
bottle.abort(400, 'Invalid form.')
try:
user = model.get_user(username)
except KeyError:
bottle.redirect('/login?msg=fail')
if user['password_hash'] == model.PASSWORDLESS_HASH:
if not handler_util.is_admin():
bottle.redirect('/login?msg=fail')
else:
if not sha256_crypt.verify(password, user['password_hash']):
bottle.redirect('/login?msg=fail')
handler_util.set_current_username(username)
bottle.redirect('/')
python类abort()的实例源码
def problem_view_handler(problem_id):
try:
problem_id = int(problem_id)
except ValueError:
bottle.abort(404, 'Problem not found.')
try:
problem = model.get_public_problem(problem_id=problem_id)
except KeyError:
bottle.abort(404, 'Problem not found.')
owner = model.get_user(problem['owner'])
problem_spec = model.load_blob(problem['problem_spec_hash'])
snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot(
problem_id=problem_id, public_only=True)
handler_util.inject_ranks_to_ranked_solutions(ranked_solutions)
template_dict = {
'problem': problem,
'problem_spec': problem_spec,
'owner': owner,
'ranked_solutions': ranked_solutions,
'snapshot_time': snapshot_time,
}
return handler_util.render('problem_view.html', template_dict)
def admin_problem_view_handler(problem_id):
problem_id = int(problem_id)
try:
problem = model.get_problem_for_admin(problem_id=problem_id)
except KeyError:
bottle.abort(404, 'Problem not found.')
owner = model.get_user(problem['owner'])
problem_spec = model.load_blob(problem['problem_spec_hash'])
snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot(
problem_id=problem_id, public_only=False)
handler_util.inject_ranks_to_ranked_solutions(ranked_solutions)
team_display_name_map = handler_util.compute_team_display_name_map(
solution['owner'] for solution in ranked_solutions)
template_dict = {
'problem': problem,
'problem_spec': problem_spec,
'owner': owner,
'ranked_solutions': ranked_solutions,
'team_display_name_map': team_display_name_map,
'snapshot_time': snapshot_time,
}
return handler_util.render('admin/problem_view.html', template_dict)
def admin_solution_view_handler(solution_id):
solution_id = int(solution_id)
try:
solution = model.get_solution_for_admin(solution_id=solution_id)
problem = model.get_problem_for_admin(problem_id=solution['problem_id'])
except KeyError:
bottle.abort(404, 'Solution not found.')
problem_owner = model.get_user(problem['owner'])
solution_owner = model.get_user(solution['owner'])
solution_spec = model.load_blob(solution['solution_spec_hash'])
template_dict = {
'solution': solution,
'solution_spec': solution_spec,
'solution_owner': solution_owner,
'problem_owner': problem_owner,
}
return handler_util.render('admin/solution_view.html', template_dict)
def enforce_api_rate_limit(action, limit_in_window):
"""Enforces API rate limit.
Args:
action: Action name.
limit_in_window: Maximum number of requests of this action in a window.
Raises:
bottle.HTTPError: If the rate limit is exceeded.
"""
if (FLAGS.enable_load_test_hacks and
bottle.request.headers.get('X-Load-Test', '') == 'yes'):
return
if get_current_user()['organizer']:
return
username = get_current_username()
if (model.record_last_api_access_time(username) <
FLAGS.api_rate_limit_request_interval):
bottle.abort(429, 'Rate limit exceeded (per-second limit).')
count = model.increment_api_rate_limit_counter(username, action)
if count > limit_in_window:
bottle.abort(429, 'Rate limit exceeded (per-hour limit).')
# http://flask.pocoo.org/snippets/44/
def post_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
# try:
# result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
# response.content_type = "application/json; charset=utf-8"
# return json.dumps(result, cls=CustomEncoder)
# except dalUtils.StatusCodeError as err:
# response.status = err.value
# except:
# abort(400, 'Bad Request')
# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def v1_random_fc_get(response, dbid_to_visid, store):
'''Retrieves a random feature collection from the database.
The route for this endpoint is:
``GET /dossier/v1/random/feature-collection``.
Assuming the database has at least one feature collection,
this end point returns an array of two elements. The first
element is the content id and the second element is a
feature collection (in the same format returned by
:func:`dossier.web.routes.v1_fc_get`).
If the database is empty, then a 404 error is returned.
Note that currently, this may not be a uniformly random sample.
'''
# Careful, `store.scan()` would be obscenely slow here...
sample = streaming_sample(store.scan_ids(), 1, 1000)
if len(sample) == 0:
bottle.abort(404, 'The feature collection store is empty.')
return [dbid_to_visid(sample[0]), util.fc_to_json(store.get(sample[0]))]
def verify_user_exists(user):
user in pubsub.user_info or abort(404, f'Unknown user: {user!r}')
def register_handler():
if settings.has_contest_finished():
bottle.abort(403, 'The contest is over!')
empty_user = {
'display_name': '',
'contact_email': '',
'member_names': '',
'nationalities': '',
'languages': '',
'source_url': '',
}
return handler_util.render('register.html', {'user': empty_user})
def register_post_handler():
if settings.has_contest_finished():
bottle.abort(403, 'The contest is over!')
try:
basic_profile = handler_util.parse_basic_profile_forms()
except ValueError:
bottle.abort(400, 'Invalid form.')
username, password = model.register_user(
remote_host=bottle.request.remote_addr,
**basic_profile)
template_dict = {
'username': username,
'password': password,
}
return handler_util.render('registered.html', template_dict)
def profile_post_handler():
try:
basic_profile = handler_util.parse_basic_profile_forms()
except ValueError:
bottle.abort(400, 'Invalid form.')
username = handler_util.get_current_username()
model.update_user(username, **basic_profile)
bottle.redirect('/profile?msg=updated')
def solution_submit_handler():
if handler_util.get_current_user()['organizer']:
bottle.abort(400, 'You are organizer, not allowed to submit solutions.')
problem_id = bottle.request.query.get('problem_id', '')
return handler_util.render('solution_submit.html', {'problem_id': problem_id})
def admin_visualize_problem_handler(problem_id):
thumbnail = bool(bottle.request.query.get('thumbnail'))
problem_id = int(problem_id)
try:
problem = model.get_problem_for_admin(problem_id=problem_id)
except KeyError:
bottle.abort(404, 'Problem not found.')
problem_spec = model.load_blob(problem['problem_spec_hash'])
image_binary = visualize.visualize_problem(problem_spec, thumbnail)
bottle.response.content_type = 'image/png'
return image_binary
def admin_visualize_solution_handler(solution_id):
thumbnail = bool(bottle.request.query.get('thumbnail'))
solution_id = int(solution_id)
try:
solution = model.get_solution_for_admin(solution_id=solution_id)
except KeyError:
bottle.abort(404, 'Solution not found.')
solution_spec = model.load_blob(solution['solution_spec_hash'])
problem_spec = model.load_blob(solution['problem_spec_hash'])
image_binary = visualize.visualize_solution(
problem_spec, solution_spec, thumbnail)
bottle.response.content_type = 'image/png'
return image_binary
def testing_cron_snapshot_job_handler():
if not FLAGS.enable_testing_handlers:
bottle.abort(403, 'Disabled')
cron_jobs.snapshot_job()
return 'done'
def _protect_before_contest_hook():
"""Before-request hook to protect the whole website before the contest."""
if bottle.request.path.startswith(('/health', '/ping', '/api/', '/admin/')):
return
if not is_admin() and not settings.has_contest_started():
bottle.abort(403, 'The contest has not started yet.')
def _enforce_web_rate_limit_hook():
"""Before-request hook to enforce web rate limit."""
if (FLAGS.enable_load_test_hacks and
bottle.request.headers.get('X-Load-Test', '') == 'yes'):
return
if bottle.request.path.startswith(('/health', '/ping', '/static/', '/api/', '/admin/')):
return
user = get_current_user()
if not user:
return
if user['organizer']:
return
if not model.decrement_web_rate_limit_counter(user['_id']):
bottle.abort(429, 'Rate limit exceeded.')
def _protect_xsrf_hook():
"""Before-request hook to protect from XSRF attacks."""
# No need to protect API calls.
if bottle.request.path.startswith('/api/'):
return
if bottle.request.method not in ('GET', 'HEAD'):
xsrf_token = bottle.request.forms.get('xsrf_token', 'N/A')
if xsrf_token != get_xsrf_token():
bottle.abort(400, 'XSRF token is incorrect or not set.')
def _require_gzip_hook():
"""Before-request hook to require gzip for API requests."""
if (FLAGS.enable_load_test_hacks and
bottle.request.headers.get('X-Load-Test', '') == 'yes'):
return
if bottle.request.path.startswith('/api/'):
accept_encoding = bottle.request.headers.get(
'X-Accept-Encoding',
bottle.request.headers.get('Accept-Encoding', ''))
if 'gzip' not in accept_encoding:
bottle.abort(
400, 'Accept-Encoding: gzip is required for API requests')
def checkserial(func):
"""Decorator to call function only when machine connected."""
def _decorator(*args, **kwargs):
if driveboard.connected():
return func(*args, **kwargs)
else:
bottle.abort(400, "No machine.")
return _decorator
### STATIC FILES
def offset(x, y, z):
if not driveboard.status()['ready']:
bottle.abort(400, "Machine not ready.")
driveboard.def_offset_custom(x, y, z)
driveboard.sel_offset_custom()
return '{}'
def clear_offset():
if not driveboard.status()['ready']:
bottle.abort(400, "Machine not ready.")
driveboard.def_offset_custom(0,0,0)
driveboard.sel_offset_table()
return '{}'
### JOBS QUEUE
def _get(jobname, library=False):
# get job as sting
if library:
jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\'))
else:
jobpath = os.path.join(conf['stordir'], jobname.strip('/\\'))
if os.path.exists(jobpath+'.dba'):
jobpath = jobpath+'.dba'
elif os.path.exists(jobpath + '.dba.starred'):
jobpath = jobpath + '.dba.starred'
else:
bottle.abort(400, "No such file.")
with open(jobpath) as fp:
job = fp.read()
return job
def _get_path(jobname, library=False):
if library:
jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\'))
else:
jobpath = os.path.join(conf['stordir'], jobname.strip('/\\'))
if os.path.exists(jobpath+'.dba'):
return jobpath+'.dba'
elif os.path.exists(jobpath+'.dba.starred'):
return jobpath+'.dba.starred'
else:
bottle.abort(400, "No such file.")
def load():
"""Load a dba, svg, dxf, or gcode job.
Args:
(Args come in through the POST request.)
job: Parsed dba or job string (dba, svg, dxf, or ngc).
name: name of the job (string)
optimize: flag whether to optimize (bool)
overwrite: flag whether to overwite file if present (bool)
"""
load_request = json.loads(bottle.request.forms.get('load_request'))
job = load_request.get('job') # always a string
name = load_request.get('name')
# optimize defaults
if 'optimize' in load_request:
optimize = load_request['optimize']
else:
optimize = True
# overwrite defaults
if 'overwrite' in load_request:
overwrite = load_request['overwrite']
else:
overwrite = False
# sanity check
if job is None or name is None:
bottle.abort(400, "Invalid request data.")
# convert
try:
job = jobimport.convert(job, optimize=optimize)
except TypeError:
if DEBUG: traceback.print_exc()
bottle.abort(400, "Invalid file type.")
if not overwrite:
name = _unique_name(name)
_add(json.dumps(job), name)
return json.dumps(name)
def listing(kind=None):
"""List all queue jobs by name."""
if kind is None:
files = _get_sorted('*.dba*', stripext=True)
elif kind == 'starred':
files = _get_sorted('*.dba.starred', stripext=True)
print files
elif kind == 'unstarred':
files = _get_sorted('*.dba', stripext=True)
else:
bottle.abort(400, "Invalid kind.")
return json.dumps(files)
def star(jobname):
"""Star a job."""
jobpath = _get_path(jobname)
if jobpath.endswith('.dba'):
os.rename(jobpath, jobpath + '.starred')
else:
bottle.abort(400, "No such file.")
return '{}'
def unstar(jobname):
"""Unstar a job."""
jobpath = _get_path(jobname)
if jobpath.endswith('.starred'):
os.rename(jobpath, jobpath[:-8])
else:
bottle.abort(400, "No such file.")
return '{}'
def run(jobname):
"""Send job from queue to the machine."""
job = _get(jobname)
if not driveboard.status()['ready']:
bottle.abort(400, "Machine not ready.")
driveboard.job(json.loads(job))
return '{}'
def build():
"""Build firmware from firmware/src files (for all config files)."""
return_code = driveboard.build()
if return_code != 0:
bottle.abort(400, "Build failed.")
else:
return '{}'