python类abort()的实例源码

handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def login_post_handler():
    try:
        username = bottle.request.forms['username']
        password = bottle.request.forms['password']
    except KeyError:
        bottle.abort(400, 'Invalid form.')
    try:
        user = model.get_user(username)
    except KeyError:
        bottle.redirect('/login?msg=fail')
    if user['password_hash'] == model.PASSWORDLESS_HASH:
        if not handler_util.is_admin():
            bottle.redirect('/login?msg=fail')
    else:
        if not sha256_crypt.verify(password, user['password_hash']):
            bottle.redirect('/login?msg=fail')
    handler_util.set_current_username(username)
    bottle.redirect('/')
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def problem_view_handler(problem_id):
    try:
        problem_id = int(problem_id)
    except ValueError:
        bottle.abort(404, 'Problem not found.')
    try:
        problem = model.get_public_problem(problem_id=problem_id)
    except KeyError:
        bottle.abort(404, 'Problem not found.')
    owner = model.get_user(problem['owner'])
    problem_spec = model.load_blob(problem['problem_spec_hash'])
    snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot(
        problem_id=problem_id, public_only=True)
    handler_util.inject_ranks_to_ranked_solutions(ranked_solutions)
    template_dict = {
        'problem': problem,
        'problem_spec': problem_spec,
        'owner': owner,
        'ranked_solutions': ranked_solutions,
        'snapshot_time': snapshot_time,
    }
    return handler_util.render('problem_view.html', template_dict)
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def admin_problem_view_handler(problem_id):
    problem_id = int(problem_id)
    try:
        problem = model.get_problem_for_admin(problem_id=problem_id)
    except KeyError:
        bottle.abort(404, 'Problem not found.')
    owner = model.get_user(problem['owner'])
    problem_spec = model.load_blob(problem['problem_spec_hash'])
    snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot(
        problem_id=problem_id, public_only=False)
    handler_util.inject_ranks_to_ranked_solutions(ranked_solutions)
    team_display_name_map = handler_util.compute_team_display_name_map(
        solution['owner'] for solution in ranked_solutions)
    template_dict = {
        'problem': problem,
        'problem_spec': problem_spec,
        'owner': owner,
        'ranked_solutions': ranked_solutions,
        'team_display_name_map': team_display_name_map,
        'snapshot_time': snapshot_time,
    }
    return handler_util.render('admin/problem_view.html', template_dict)
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def admin_solution_view_handler(solution_id):
    solution_id = int(solution_id)
    try:
        solution = model.get_solution_for_admin(solution_id=solution_id)
        problem = model.get_problem_for_admin(problem_id=solution['problem_id'])
    except KeyError:
        bottle.abort(404, 'Solution not found.')
    problem_owner = model.get_user(problem['owner'])
    solution_owner = model.get_user(solution['owner'])
    solution_spec = model.load_blob(solution['solution_spec_hash'])
    template_dict = {
        'solution': solution,
        'solution_spec': solution_spec,
        'solution_owner': solution_owner,
        'problem_owner': problem_owner,
    }
    return handler_util.render('admin/solution_view.html', template_dict)
handler_util.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def enforce_api_rate_limit(action, limit_in_window):
    """Enforces API rate limit.

    Args:
        action: Action name.
        limit_in_window: Maximum number of requests of this action in a window.

    Raises:
        bottle.HTTPError: If the rate limit is exceeded.
    """
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if get_current_user()['organizer']:
        return
    username = get_current_username()
    if (model.record_last_api_access_time(username) <
            FLAGS.api_rate_limit_request_interval):
        bottle.abort(429, 'Rate limit exceeded (per-second limit).')
    count = model.increment_api_rate_limit_counter(username, action)
    if count > limit_in_window:
        bottle.abort(429, 'Rate limit exceeded (per-hour limit).')


# http://flask.pocoo.org/snippets/44/
crud.py 文件源码 项目:python-server-typescript-client 作者: marianc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
#    try:
#        result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
#        response.content_type = "application/json; charset=utf-8"
#        return json.dumps(result, cls=CustomEncoder)
#    except dalUtils.StatusCodeError as err:
#        response.status = err.value
#    except:
#        abort(400, 'Bad Request')


# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
routes.py 文件源码 项目:memex-dossier-open 作者: dossier 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def v1_random_fc_get(response, dbid_to_visid, store):
    '''Retrieves a random feature collection from the database.

    The route for this endpoint is:
    ``GET /dossier/v1/random/feature-collection``.

    Assuming the database has at least one feature collection,
    this end point returns an array of two elements. The first
    element is the content id and the second element is a
    feature collection (in the same format returned by
    :func:`dossier.web.routes.v1_fc_get`).

    If the database is empty, then a 404 error is returned.

    Note that currently, this may not be a uniformly random sample.
    '''
    # Careful, `store.scan()` would be obscenely slow here...
    sample = streaming_sample(store.scan_ids(), 1, 1000)
    if len(sample) == 0:
        bottle.abort(404, 'The feature collection store is empty.')
    return [dbid_to_visid(sample[0]), util.fc_to_json(store.get(sample[0]))]
webapp.py 文件源码 项目:modernpython 作者: rhettinger 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verify_user_exists(user):
    user in pubsub.user_info or abort(404, f'Unknown user: {user!r}')
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def register_handler():
    if settings.has_contest_finished():
        bottle.abort(403, 'The contest is over!')
    empty_user = {
        'display_name': '',
        'contact_email': '',
        'member_names': '',
        'nationalities': '',
        'languages': '',
        'source_url': '',
    }
    return handler_util.render('register.html', {'user': empty_user})
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register_post_handler():
    if settings.has_contest_finished():
        bottle.abort(403, 'The contest is over!')
    try:
        basic_profile = handler_util.parse_basic_profile_forms()
    except ValueError:
        bottle.abort(400, 'Invalid form.')
    username, password = model.register_user(
        remote_host=bottle.request.remote_addr,
        **basic_profile)
    template_dict = {
        'username': username,
        'password': password,
    }
    return handler_util.render('registered.html', template_dict)
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def profile_post_handler():
    try:
        basic_profile = handler_util.parse_basic_profile_forms()
    except ValueError:
        bottle.abort(400, 'Invalid form.')
    username = handler_util.get_current_username()
    model.update_user(username, **basic_profile)
    bottle.redirect('/profile?msg=updated')
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def solution_submit_handler():
    if handler_util.get_current_user()['organizer']:
        bottle.abort(400, 'You are organizer, not allowed to submit solutions.')
    problem_id = bottle.request.query.get('problem_id', '')
    return handler_util.render('solution_submit.html', {'problem_id': problem_id})
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def admin_visualize_problem_handler(problem_id):
    thumbnail = bool(bottle.request.query.get('thumbnail'))
    problem_id = int(problem_id)
    try:
        problem = model.get_problem_for_admin(problem_id=problem_id)
    except KeyError:
        bottle.abort(404, 'Problem not found.')
    problem_spec = model.load_blob(problem['problem_spec_hash'])
    image_binary = visualize.visualize_problem(problem_spec, thumbnail)
    bottle.response.content_type = 'image/png'
    return image_binary
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def admin_visualize_solution_handler(solution_id):
    thumbnail = bool(bottle.request.query.get('thumbnail'))
    solution_id = int(solution_id)
    try:
        solution = model.get_solution_for_admin(solution_id=solution_id)
    except KeyError:
        bottle.abort(404, 'Solution not found.')
    solution_spec = model.load_blob(solution['solution_spec_hash'])
    problem_spec = model.load_blob(solution['problem_spec_hash'])
    image_binary = visualize.visualize_solution(
        problem_spec, solution_spec, thumbnail)
    bottle.response.content_type = 'image/png'
    return image_binary
handler.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testing_cron_snapshot_job_handler():
    if not FLAGS.enable_testing_handlers:
        bottle.abort(403, 'Disabled')
    cron_jobs.snapshot_job()
    return 'done'
handler_util.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _protect_before_contest_hook():
    """Before-request hook to protect the whole website before the contest."""
    if bottle.request.path.startswith(('/health', '/ping', '/api/', '/admin/')):
        return
    if not is_admin() and not settings.has_contest_started():
        bottle.abort(403, 'The contest has not started yet.')
handler_util.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _enforce_web_rate_limit_hook():
    """Before-request hook to enforce web rate limit."""
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if bottle.request.path.startswith(('/health', '/ping', '/static/', '/api/', '/admin/')):
        return
    user = get_current_user()
    if not user:
        return
    if user['organizer']:
        return
    if not model.decrement_web_rate_limit_counter(user['_id']):
        bottle.abort(429, 'Rate limit exceeded.')
handler_util.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _protect_xsrf_hook():
    """Before-request hook to protect from XSRF attacks."""
    # No need to protect API calls.
    if bottle.request.path.startswith('/api/'):
        return
    if bottle.request.method not in ('GET', 'HEAD'):
        xsrf_token = bottle.request.forms.get('xsrf_token', 'N/A')
        if xsrf_token != get_xsrf_token():
            bottle.abort(400, 'XSRF token is incorrect or not set.')
handler_util.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _require_gzip_hook():
    """Before-request hook to require gzip for API requests."""
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if bottle.request.path.startswith('/api/'):
        accept_encoding = bottle.request.headers.get(
            'X-Accept-Encoding',
            bottle.request.headers.get('Accept-Encoding', ''))
        if 'gzip' not in accept_encoding:
            bottle.abort(
                400, 'Accept-Encoding: gzip is required for API requests')
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def checkserial(func):
    """Decorator to call function only when machine connected."""
    def _decorator(*args, **kwargs):
            if driveboard.connected():
                return func(*args, **kwargs)
            else:
                bottle.abort(400, "No machine.")
    return _decorator




### STATIC FILES
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def offset(x, y, z):
    if not driveboard.status()['ready']:
        bottle.abort(400, "Machine not ready.")
    driveboard.def_offset_custom(x, y, z)
    driveboard.sel_offset_custom()
    return '{}'
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def clear_offset():
    if not driveboard.status()['ready']:
        bottle.abort(400, "Machine not ready.")
    driveboard.def_offset_custom(0,0,0)
    driveboard.sel_offset_table()
    return '{}'




### JOBS QUEUE
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get(jobname, library=False):
    # get job as sting
    if library:
        jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\'))
    else:
        jobpath = os.path.join(conf['stordir'], jobname.strip('/\\'))
    if os.path.exists(jobpath+'.dba'):
        jobpath = jobpath+'.dba'
    elif os.path.exists(jobpath + '.dba.starred'):
        jobpath = jobpath + '.dba.starred'
    else:
        bottle.abort(400, "No such file.")
    with open(jobpath) as fp:
        job = fp.read()
    return job
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_path(jobname, library=False):
    if library:
        jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\'))
    else:
        jobpath = os.path.join(conf['stordir'], jobname.strip('/\\'))
    if os.path.exists(jobpath+'.dba'):
        return jobpath+'.dba'
    elif os.path.exists(jobpath+'.dba.starred'):
        return jobpath+'.dba.starred'
    else:
        bottle.abort(400, "No such file.")
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load():
    """Load a dba, svg, dxf, or gcode job.

    Args:
        (Args come in through the POST request.)
        job: Parsed dba or job string (dba, svg, dxf, or ngc).
        name: name of the job (string)
        optimize: flag whether to optimize (bool)
        overwrite: flag whether to overwite file if present (bool)
    """
    load_request = json.loads(bottle.request.forms.get('load_request'))
    job = load_request.get('job')  # always a string
    name = load_request.get('name')
    # optimize defaults
    if 'optimize' in load_request:
        optimize = load_request['optimize']
    else:
        optimize = True
    # overwrite defaults
    if 'overwrite' in load_request:
        overwrite = load_request['overwrite']
    else:
        overwrite = False
    # sanity check
    if job is None or name is None:
        bottle.abort(400, "Invalid request data.")
    # convert
    try:
        job = jobimport.convert(job, optimize=optimize)
    except TypeError:
        if DEBUG: traceback.print_exc()
        bottle.abort(400, "Invalid file type.")

    if not overwrite:
        name = _unique_name(name)
    _add(json.dumps(job), name)
    return json.dumps(name)
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def listing(kind=None):
    """List all queue jobs by name."""
    if kind is None:
        files = _get_sorted('*.dba*', stripext=True)
    elif kind == 'starred':
        files = _get_sorted('*.dba.starred', stripext=True)
        print files
    elif kind == 'unstarred':
        files = _get_sorted('*.dba', stripext=True)
    else:
        bottle.abort(400, "Invalid kind.")
    return json.dumps(files)
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def star(jobname):
    """Star a job."""
    jobpath = _get_path(jobname)
    if jobpath.endswith('.dba'):
        os.rename(jobpath, jobpath + '.starred')
    else:
        bottle.abort(400, "No such file.")
    return '{}'
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unstar(jobname):
    """Unstar a job."""
    jobpath = _get_path(jobname)
    if jobpath.endswith('.starred'):
        os.rename(jobpath, jobpath[:-8])
    else:
        bottle.abort(400, "No such file.")
    return '{}'
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(jobname):
    """Send job from queue to the machine."""
    job = _get(jobname)
    if not driveboard.status()['ready']:
        bottle.abort(400, "Machine not ready.")
    driveboard.job(json.loads(job))
    return '{}'
web.py 文件源码 项目:driveboardapp 作者: nortd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def build():
    """Build firmware from firmware/src files (for all config files)."""
    return_code = driveboard.build()
    if return_code != 0:
        bottle.abort(400, "Build failed.")
    else:
        return '{}'


问题


面经


文章

微信
公众号

扫码关注公众号