python类HTTPBadRequest()的实例源码

interstitial.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_interstitial(request: Request, choices: t.List[Choice], *args, **kwargs):
    """Check if user pressed any of the buttons on form and the choice accordingly.

    For example use case see :py:class:`websauna.system.crud.views.Delete`.

    :param args: Passed to choice callback
    :param kwargs: Passed to choice callback
    :return: HTTP response given by a choice callback
    """
    assert request.method == "POST"

    # Force CSRF check always
    check_csrf_token(request)

    for c in choices:
        if c.id in request.POST:
            return c.callback(*args, **kwargs)

    raise HTTPBadRequest("Unknown choice made")
views.py 文件源码 项目:de-visualization-wizard 作者: deleidos 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_model(request):
    mongohost = request.registry.settings['MONGOHOST']
    domain = request.matchdict.get('domain').replace(' ', '_')
    modelName = domain + '_mdl'
    gfsm = GridFSModel(modelName=modelName, host=mongohost, port=27017, logger=logger)
    model = gfsm.getModelFromGridFS()
    if model is None:
        raise exc.HTTPBadRequest(explanation="The domain {} does not have a domain model loaded.".format(domain))

    w2vmodel = M(modelName=modelName, model=model, logger=logger, verbose=True)
    log.info("Created W2VModel:" + modelName)

    terms = request.matchdict.get('terms', -1)
    w2vmodel.setTerms(terms)
    most_similar_terms = w2vmodel.process()
    log.info("Processing Terms :" + str(w2vmodel.getTerms()))
    return most_similar_terms
views_test.py 文件源码 项目:bouncer 作者: hypothesis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_it_rejects_invalid_or_missing_urls(self):
        invalid_urls = [None,

                        # Unsupported protocols.
                        'ftp://foo.bar',
                        'doi:10.1.2/345',
                        'file://foo.bar',

                        # Malformed URLs.
                        'http://goo\[g']

        for url in invalid_urls:
            request = mock_request()
            request.GET['url'] = url

            with pytest.raises(httpexceptions.HTTPBadRequest):
                views.goto_url(request)
query.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self):
        searchers = []
        for param_name, param in self.params.items():
            if hasattr(PackagesSearchParams, param_name):
                param_method = getattr(PackagesSearchParams, param_name)
                if (not hasattr(param_method, '_no_param') and
                        (len(param) == 0 or param[0] == '')):
                    raise HTTPBadRequest(
                        detail=Messages.no_values % param_name)
                if hasattr(param_method, '_only_one'):
                    if len(param) > 1:
                        raise HTTPBadRequest(
                            detail=Messages.too_many_values % (1, len(param),))
                    else:
                        param = param[0]
                search = param_method(param)
                searchers.append(search)
            else:
                raise HTTPBadRequest(
                    detail=Messages.bad_search_param % param_name)
        self.searchers = searchers
        return searchers
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete_plugin(request):
    """
    Delete A plugin
    """
    plugin_id = request.matchdict['plugin_id']
    user = request.user
    query = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id)
    plugin = query.first()
    if plugin is None:
        raise exc.HTTPBadRequest('No such plugin')
    if user != plugin.user and not user.admin:
        raise exc.HTTPForbidden("You don't have access to do that")
    request.db_session.query(LedSchedule).filter(LedSchedule.led_plugin_id == plugin_id).delete()
    request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).delete()
    query.delete()
    log(request, 'Deleted plugin ' + plugin.name)
    return exc.HTTPFound(location='/plugin')
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_group_users(request):
    # make sure the users are in the group:
    # only a site admin or group admin can do this

    gid = request.matchdict['group_id']
    can_modify_group(request, gid)
    users = request.POST.get('users', None)
    # print request.POST
    if users is None:
        raise exc.HTTPBadRequest('Please specify users to add to the group')
    new_users = []
    for user in request.POST.values():
        group_user = LedGroupUser(led_group_id=gid, led_user_id=user)
        try:
            request.db_session.add(group_user)
            new_users.append(get_user_by_id(request, user).email)
        except sql_exc.DatabaseError as e:
            print group_user, "already in group"
    log(request, 'Added users to <a href="/group/{0}">group {0}</a>: {1}'.format(gid, ', '.join(new_users)))
    return exc.HTTPFound(location='/group/' + gid)
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def user_delete(request):
    """
    Deletes a user and their group memberships
    Their plugins will be handed over to the user that deleted them
    :param request:
    :return:
    """
    # make sure the user actually exists
    user_id = request.matchdict['user_id']
    query = request.db_session.query(LedUser).filter(LedUser.id == user_id)
    user = query.first()
    if user is None:
        return exc.HTTPBadRequest("No such user exists")
    logged_in_user = request.user

    for plugin in request.db_session.query(LedPlugin).filter(LedPlugin.user == user).all():
        plugin.user_id = logged_in_user.id
    request.db_session.query(LedGroupUser).filter(LedGroupUser.led_user == user).delete()
    query.delete()
    log(request, 'Deleted user '+user.email)
    return exc.HTTPFound(location='/users')
discussion.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_url_or_none(self, url):
        if url == '':
            url = None
        if url is not None:
            from urllib.parse import urlparse
            parsed_url = urlparse(url)
            from pyramid.httpexceptions import HTTPBadRequest
            if not parsed_url.scheme:
                raise HTTPBadRequest(
                    "The homepage url does not have a scheme. Must be either http or https"
                )

            if parsed_url.scheme not in (u'http', u'https'):
                raise HTTPBadRequest(
                    "The url has an incorrect scheme. Only http and https are accepted for homepage url"
                )
        return url
idea.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete_idea(request):
    idea_id = request.matchdict['id']
    idea = Idea.get_instance(idea_id)

    if not idea:
        raise HTTPNotFound("Idea with id '%s' not found." % idea_id)
    if isinstance(idea, RootIdea):
        raise HTTPBadRequest("Cannot delete root idea.")
    num_childrens = len(idea.children)
    if num_childrens > 0:
        raise HTTPBadRequest("Idea cannot be deleted because it still has %d child ideas." % num_childrens)
    num_extracts = len(idea.extracts)
    if num_extracts > 0:
        raise HTTPBadRequest("Idea cannot be deleted because it still has %d extracts." % num_extracts)
    for link in idea.source_links:
        link.is_tombstone = True
    idea.is_tombstone = True
    # Maybe return tombstone() ?
    request.response.status = HTTPNoContent.code
    return HTTPNoContent()
auth.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def put_roles(request):
    session = Role.default_db
    try:
        data = json.loads(request.body)
    except Exception as e:
        raise HTTPBadRequest("Malformed Json")
    if not isinstance(data, list):
        raise HTTPBadRequest("Not a list")
    if data and frozenset((type(x) for x in data)) != frozenset((str,)):
        raise HTTPBadRequest("not strings")
    data = set(data)
    known_roles = session.query(Role).all()
    roles_by_name = {r.name: r for r in known_roles}
    role_names = set(roles_by_name.keys())
    # new roles
    for name in data - role_names:
        session.add(Role(name=name))
    # delete non-system roles.
    for name in role_names - data - SYSTEM_ROLES:
        session.delete(roles_by_name[name])
    return {"added": list(data - role_names),
            "removed": list(role_names - data - SYSTEM_ROLES)}
synthesis.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def save_synthesis(request):
    synthesis_id = request.matchdict['id']
    discussion = request.context
    if synthesis_id == 'next_synthesis':
        synthesis = discussion.get_next_synthesis()
    else:
        synthesis = Synthesis.get_instance(synthesis_id)
    if not synthesis:
        raise HTTPBadRequest("Synthesis with id '%s' not found." % synthesis_id)

    synthesis_data = json.loads(request.body)

    synthesis.subject = synthesis_data.get('subject')
    synthesis.introduction = synthesis_data.get('introduction')
    synthesis.conclusion = synthesis_data.get('conclusion')

    Synthesis.default_db.add(synthesis)
    Synthesis.default_db.flush()

    return {'ok': True, 'id': synthesis.uri()}
oauth2.py 文件源码 项目:weasyl 作者: Weasyl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def authorize_post_(request):
    form = request.web_input(credentials='', username='', password='', remember_me='', mobile='', not_me='')
    try:
        credentials = json.loads(form.credentials)
    except ValueError:
        raise HTTPBadRequest()
    scopes = credentials.pop('scopes')
    error = None
    if form.not_me and form.username:
        userid, error = login.authenticate_bcrypt(form.username, form.password, bool(form.remember_me))
        if error:
            error = errorcode.login_errors.get(error, 'Unknown error.')
    elif not request.userid:
        error = "You must specify a username and password."
    else:
        userid = request.userid
    if error:
        return Response(render_form(request, scopes, credentials, bool(form.mobile), error,
                                    form.username, form.password, bool(form.remember_me),
                                    bool(form.not_me)))
    credentials['userid'] = userid
    response_attrs = server.create_authorization_response(
        *(extract_params(request) + (scopes, credentials)))
    return OAuthResponse(*response_attrs)
search.py 文件源码 项目:snovault 作者: ENCODE-DCC 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def prepare_search_term(request):
    from antlr4 import IllegalStateException
    from lucenequery.prefixfields import prefixfields
    from lucenequery import dialects

    search_term = request.params.get('searchTerm', '').strip() or '*'
    if search_term == '*':
        return search_term

    # avoid interpreting slashes as regular expressions
    search_term = search_term.replace('/', r'\/')
    # elasticsearch uses : as field delimiter, but we use it as namespace designator
    # if you need to search fields you have to use @type:field
    # if you need to search fields where the field contains ":", you will have to escape it
    # yourself
    if search_term.find("@type") < 0:
        search_term = search_term.replace(':', '\:')
    try:
        query = prefixfields('embedded.', search_term, dialects.elasticsearch)
    except (IllegalStateException):
        msg = "Invalid query: {}".format(search_term)
        raise HTTPBadRequest(explanation=msg)
    else:
        return query.getText()
subscriptions.py 文件源码 项目:webpush-channels 作者: webpush-channels 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def process_record(self, new, old=None):
        new = super(Subscription, self).process_record(new, old)
        try:
            WebPusher(new)
        except WebPushException as e:
            raise http_error(HTTPBadRequest(),
                             errno=ERRORS.INVALID_PARAMETERS,
                             message='Invalid subscription: %s' % e)
        return new
views.py 文件源码 项目:kinto-portier 作者: Kinto 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def portier_verify(request):
    """Helper to redirect client towards Portier login form."""
    broker_uri = portier_conf(request, 'broker_uri')
    token = request.validated['body']['id_token']
    # Get the data from the config because the request might only
    # have local network information and not the public facing ones.
    audience = '{scheme}://{host}'.format(scheme=request.registry.settings['http_scheme'],
                                          host=request.registry.settings['http_host'])

    try:
        email, stored_redirect = get_verified_email(
            broker_url=broker_uri,
            token=token,
            audience=audience,
            issuer=broker_uri,
            cache=request.registry.cache)
    except ValueError as exc:
        error_details = 'Portier token validation failed: %s' % exc
        return http_error(httpexceptions.HTTPBadRequest(),
                          errno=ERRORS.INVALID_AUTH_TOKEN, error='Invalid Auth Token',
                          message=error_details)

    # Generate a random token
    user_token = codecs.encode(os.urandom(32), 'hex').decode('utf-8')

    # Encrypt the email with the token
    encrypted_email = encrypt(email, user_token)

    # Generate a user ID from the token
    hmac_secret = request.registry.settings['userid_hmac_secret']
    userID = utils.hmac_digest(hmac_secret, user_token)

    # Store the encrypted user ID with the token
    session_ttl = portier_conf(request, 'session_ttl_seconds')
    request.registry.cache.set('portier:' + userID, encrypted_email, session_ttl)

    location = '%s%s' % (stored_redirect, user_token)
    return httpexceptions.HTTPFound(location=location)
views_test.py 文件源码 项目:annotran 作者: BirkbeckCTP 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_add_existing_report_to_db():
    """
        This should not add a new report to the database session as the one already exists.
    """
    with mock.patch('annotran.languages.models.Language') as language:
        language.get_by_public_language_id = MagicMock(return_value=language)

        with mock.patch('annotran.pages.models.Page') as page:
            page.get_by_uri = MagicMock(return_value=page)

            with mock.patch('h.groups.models.Group') as group:
                group.get_by_pubid = MagicMock(return_value=group)

                with mock.patch('h.models.User') as user:
                    user.get_by_username = MagicMock(return_value=user)

                    with mock.patch('annotran.translations.models.Translation') as translation:
                        translation.get_translation = MagicMock(return_value=translation)

                        with mock.patch('annotran.reports.models.Report') as report:
                            report.get_report = MagicMock(return_value=report)

                            request = _mock_request(authenticated_user=mock.Mock(username="test"),
                                                    matchdict={'public_language_id': '12345',
                                                               'public_group_id': '12345',
                                                               'user_id': '12345',
                                                               'page_uri': 'http://www.annotran_test.com/'})
                            result = views.add_report(request)
                            assert not request.db.add.called
                            assert isinstance(result, exc.HTTPBadRequest)
views.py 文件源码 项目:de-visualization-wizard 作者: deleidos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gettaxonomy_for_domain(request):
    mongohost = request.registry.settings['MONGOHOST']
    domain = request.matchdict.get('domain')
    log.info('Getting Taxonomy for Domain : domain:' + domain )
    ma = MA(host=mongohost, port=27017, logger=logger, db='KnowledgeModelTaxonomy', collection=domain.replace(' ', '_'))
    record = ma.getRecord({"domain" : domain})
    if record is None:
        raise exc.HTTPBadRequest(explanation="The domain {} does not have a taxonomy in the database.".format(domain))

    return record
setup_devices.py 文件源码 项目:senic-hub 作者: getsenic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def devices_details_view(request):
    device_id = request.matchdict["device_id"]
    logger.debug("Getting details for device with ID=%s", device_id)

    device_list_path = request.registry.settings['devices_path']
    device = get_device(device_list_path, device_id)

    # Only Philips Hue has device details at the moment
    # TODO: In the future, each single Philips Hue light should be returned as a regular
    #       device. Philips Hue bridges should be flagged with `virtual: True` since they
    #       are not controlled by the user. However, all its lights should be returned as
    #       well when requesting `/devices`.
    if device['type'] != 'philips_hue':
        return {}

    homeassistant_data_path = request.registry.settings["homeassistant_data_path"]
    phue_bridge_config = os.path.join(homeassistant_data_path, '{}.conf'.format(device["id"]))
    config = read_json(phue_bridge_config, {})
    username = config.get(device["ip"], {}).get("username")

    try:
        bridge = PhilipsHueBridgeApiClient(device["ip"], username)

        return bridge.get_lights()
    # TODO create a tween to handle exceptions for all views
    except UnauthenticatedDeviceError as e:
        raise HTTPBadRequest(e.message)
    except UpstreamError as e:
        raise HTTPBadGateway(e.message)
test_unit.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_check_list_of_strs(self):
        from hel.utils.query import check_list_of_strs
        try:
            check_list_of_strs(['test', 555], 'test')
        except HTTPBadRequest as e:
            self.assertEqual(e.detail, 'test')
        else:
            raise AssertionError()
        check_list_of_strs(['test', 'test2'])
test_views.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def param(name):

    def wrap(func):

        def f(self):

            tests = func(self)

            # Zero values: should fail
            with self.subTest(test='0 values'):
                try:
                    PackagesSearcher({
                        name: []
                    })()
                except HTTPBadRequest as e:
                    if e.detail == Messages.no_values % name:
                        # Expected exception
                        pass
                    else:
                        raise e
                else:
                    raise AssertionError()

            for test_case in tests:
                with self.subTest(test=test_case):
                    values, expected = test_case
                    searcher = PackagesSearcher({
                        name: values
                    })
                    searcher()
                    packages = [x for x in self.db['packages'].find({})]
                    search_result = searcher.search(packages)
                    for num, doc in enumerate(search_result):
                        if '_id' in search_result[num]:
                            del search_result[num]['_id']
                    self.assertTrue(are_equal(search_result, expected))

        return f

    return wrap
test_views.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_bad_search_param(self):
        try:
            PackagesSearcher({'hi': ['test']})()
        except HTTPBadRequest as e:
            self.assertEqual(e.detail, Messages.bad_search_param % 'hi')
        else:
            raise AssertionError()
query.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_list_of_strs(value, message=None):
    check(value, list, message)
    for item in value:
        if type(item) != str:
            raise HTTPBadRequest(detail=message)
    return value
query.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_url(url):
    try:
        matches = rfc3987.parse(url, rule='URI')
    except ValueError:
        raise HTTPBadRequest(detail=Messages.invalid_uri)
    if matches['scheme'] not in ['http', 'https']:
        raise HTTPBadRequest(detail=Messages.invalid_uri)
    matches['path'] = matches['path'] or '/'
    matches['fragment'] = None
    return rfc3987.compose(**matches)
query.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_path(path):
    if path[-1] == '/':
        raise HTTPBadRequest(detail=Messages.bad_path)
    return True
query.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_filename(name):
    if name.find('/') != -1:
        raise HTTPBadRequest(detail=Messages.bad_path)
    return True
views.py 文件源码 项目:data-hub-backend 作者: uktrade-attic 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update(request):
    'Update an OData entity'
    odata_tablename, etag, odata_dict = common.django_to_odata(request)
    odata_metadata = request.registry.settings['odata_metadata']
    odata_table = odata_metadata.tables[odata_tablename]
    ident = odata_dict.pop(etl_utils.primary_key(odata_table), None)
    if ident is None:
        raise http_exc.HTTPBadRequest('No identifier provided; pass `id` key')
    cdms_client = request.registry.settings['cdms_client']
    response = cdms_client.update(
        odata_tablename, etag, fmt_guid(ident), odata_dict
    )
    return common.odata_to_django(odata_tablename, response)
common.py 文件源码 项目:data-hub-backend 作者: uktrade-attic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def django_to_odata(request):
    'Transform request into spec for “onwards” request to OData service'
    django_tablename, odata_tablename = request_tablenames(request)
    try:
        etag, odata_dict = transform.django_to_odata(
            django_tablename, request.json_body
        )
    except json.JSONDecodeError as exc:
        raise http_exc.HTTPBadRequest('Invalid JSON')
    return odata_tablename, etag, odata_dict
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def approve_plugin_update(request):
    plugin_id = request.matchdict['plugin_id']
    plugin_update_query = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id)
    plugin_update = plugin_update_query.first()
    if plugin_update is None:
        raise exc.HTTPBadRequest("No such plugin to update")
    plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first()
    plugin.code = plugin_update.code
    plugin_update_query.delete()
    log(request, 'Approved updates to plugin <a href="/plugin/{}">{}</a>'.format(plugin.id, plugin.name))
    return exc.HTTPFound(location='/plugin_approve')
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reject_plugin_update(request):
    plugin_id = request.matchdict['plugin_id']
    plugin_query = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id)
    plugin_update = plugin_query.first()
    if plugin_update is None:
        raise exc.HTTPBadRequest("No proposal for this plugin")
    plugin = plugin_update.led_plugin
    plugin_query.delete()
    log(request, 'Rejected updates to plugin <a href="/plugin/{}">{}</a>'.format(plugin.id, plugin.name))
    return exc.HTTPFound(location='/plugin_approve')
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_plugin(request):
    # make sure the plugin id exists
    plugin_id = request.matchdict['plugin_id']
    plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first()
    if not plugin:
        raise exc.HTTPBadRequest("No such plugin")
    else:
        # only a site admin or plugin owner can edit

        # a non-admin plugin owner can suggest changes
        user = request.user
        if user != plugin.user and not user.admin:
            raise exc.HTTPForbidden("You don't have access to do that")
        POST = {k: to_null(v) for k, v in request.POST.items()}
        if not user.admin:
            if 'code' not in POST:
                raise exc.HTTPBadRequest("Please provide the code")
            plugin_update = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).first()
            if plugin_update is not None:
                    plugin_update.code = POST['code']
            else:
                request.db_session.add(LedPluginProposed(led_plugin_id=plugin_id, code=POST['code']))
            log(request, "Proposed changes to plugin <a href='/plugin/{}'>{}</a>".format(plugin.id, plugin.name))
            return {
                'msg': 'Your update is now awaiting approval'
            }

        else:
            if POST['code']:
                plugin.code = POST['code']
            if 'name' in POST and POST['name']:
                plugin.name = POST['name']
            log(request, 'Updated <a href="/plugin/{0}">plugin {1}</a>: '.format(plugin_id, plugin.name, ))
            return {
                'msg': 'Updated code'
            }


问题


面经


文章

微信
公众号

扫码关注公众号