python类HTTPBadRequest()的实例源码

views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_group(request):
    # create a group
    name = request.POST.get('name', None)
    if name is None:
        raise exc.HTTPBadRequest('Please specify a group name')
    group = LedGroup(name=name, default=False, enabled=False)
    request.db_session.add(group)
    request.db_session.flush()
    request.db_session.add(LedGroupUser(
        led_group_id=group.id,
        led_user=request.user,
        access_level=2))
    print("Made group", group)
    log(request, 'Created group <a href="/group/{0}">{1}</a>'.format(group.id, group.name))
    return exc.HTTPFound(location='/group/'+str(group.id))
auth.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def _do_update_from_json(
            self, json, parse_def, ctx,
            duplicate_handling=None, object_importer=None):
        user_id = ctx.get_user_id()
        json_user_id = json.get('user', None)
        if json_user_id is None:
            json_user_id = user_id
        else:
            json_user_id = User.get_database_id(json_user_id)
            # Do not allow changing user
            if self.user_id is not None and json_user_id != self.user_id:
                raise HTTPBadRequest()
        self.user_id = json_user_id
        role_name = json.get("role", None)
        if not (role_name or self.role_id):
            role_name = R_PARTICIPANT
        if role_name:
            role = self.db.query(Role).filter_by(name=role_name).first()
            if not role:
                raise HTTPBadRequest("Invalid role name:"+role_name)
            self.role = role
        json_discussion_id = json.get('discussion', None)
        if json_discussion_id:
            from .discussion import Discussion
            json_discussion_id = Discussion.get_database_id(json_discussion_id)
            # Do not allow change of discussion
            if self.discussion_id is not None \
                    and json_discussion_id != self.discussion_id:
                raise HTTPBadRequest()
            self.discussion_id = json_discussion_id
        else:
            if not self.discussion_id:
                raise HTTPBadRequest()
        return self
post.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def add_post_json(request):
    if has_moderation(request.json):
        raise HTTPBadRequest("Cannot moderate at post creation")
    # TODO: apply guess_languages
    return collection_add_json(request)
token.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_password_token(request):
    token = request.matchdict.get('token', None)
    user, validity = verify_password_change_token(token)
    if validity != Validity.VALID:
        raise HTTPBadRequest(validity.name)
    return {"user": user.uri()}
auth.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def put_permissions_for_role(request):
    discussion = request.context
    role_name = request.matchdict['role_name']
    session = Discussion.default_db
    role = Role.get_by(name=role_name)
    if not role:
        raise HTTPNotFound("Role %s does not exist" % (role_name,))
    try:
        data = json.loads(request.body)
    except Exception as e:
        raise HTTPBadRequest("Malformed Json")
    if not isinstance(data, list):
        raise HTTPBadRequest("Not a list")
    if data and frozenset((type(x) for x in data)) != frozenset((str,)):
        raise HTTPBadRequest("not strings")
    permissions = set(session.query(Permission).filter(Permission.name.in_(data)).all())
    data = set(data)
    if len(permissions) != len(data):
        raise HTTPBadRequest("Not valid permissions: %s" % (repr(
            data - set((p.name for p in permissions))),))
    known_dp = session.query(DiscussionPermission).join(Permission).filter(
        role=role, discussion=discussion).all()
    dp_by_permission = {dp.permission.name: dp for dp in known_dp}
    known_permissions = set(dp_by_permission.keys())
    for permission in known_permissions - permissions:
        session.delete(dp_by_permission[permission])
    for permission in permissions - known_permissions:
        session.add(DiscussionPermission(
            role=role, permission=permission, discussion=discussion))
    return {"added": list(permissions - known_permissions),
            "removed": list(known_permissions - permissions)}
auth.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def put_discussion_roles_for_user(request):
    discussion = request.context
    user_id = request.matchdict['user_id']
    user = User.get_instance(user_id)
    if not user:
        raise HTTPNotFound("User id %d does not exist" % (user_id,))
    try:
        data = json.loads(request.body)
    except Exception as e:
        raise HTTPBadRequest("Malformed Json")
    session = Discussion.default_db
    if not isinstance(data, list):
        raise HTTPBadRequest("Not a list")
    if data and frozenset((type(x) for x in data)) != frozenset((str,)):
        raise HTTPBadRequest("not strings")
    roles = set(session.query(Role).filter(Role.name.in_(data)).all())
    data = set(data)
    if len(roles) != len(data):
        raise HTTPBadRequest("Not valid roles: %s" % (repr(
            data - set((p.name for p in roles))),))
    known_lu_roles = session.query(LocalUserRole).join(Role).filter(
        user=user, discussion=discussion).all()
    lur_by_role = {lur.role.name: lur for lur in known_lu_roles}
    known_roles = set(lur_by_role.keys())
    for role in known_roles - roles:
        session.query.delete(lur_by_role[role])
    for role in roles - known_roles:
        session.add(LocalUserRole(
            user=user, role=role, discussion=discussion))
    return {"added": list(roles - known_roles),
            "removed": list(known_roles - roles)}
utils.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def mime_type(request):
    url = request.params.get('url', None)
    if not url:
        raise HTTPBadRequest("Missing 'url' parameter")
    parsed = urlparse(url)
    if not parsed or parsed.scheme not in ('http', 'https'):
        raise HTTPBadRequest("Wrong scheme")
    if parsed.netloc.split(":")[0] == config.get('public_hostname'):
        # is it one of our own documents?
        # If so, detect it and shortcut to avoid the pyramid handler calling
        # another pyramid handler, as this exhausts pyramid threads rapidly
        # and can deadlock the whole application
        r = re.match(
            r'^https?://[\w\.]+(?:\:\d+)?/data/.*/documents/(\d+)/data(?:\?.*)?$',
            url)
        if r:
            document_id = r.groups(0)[0]
            from sqlalchemy.sql.functions import func
            mimetype, create_date, size = File.default_db.query(
                File.mime_type, File.creation_date, func.length(File.data)
                ).filter_by(id=int(document_id)).first()
            return Response(
                body=None, content_type=str(mimetype),
                content_length=size, last_modified=create_date)
    try:
        result = requests.head(url, timeout=15)
    except requests.ConnectionError:
        return Response(
            status=503,
            location=url)

    return Response(
        content_type=result.headers.get('Content-Type', None),
        status=result.status_code,
        location=result.url)
oauth2.py 文件源码 项目:weasyl 作者: Weasyl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def authorize_get_(request):
    form = request.web_input(mobile='')
    try:
        scopes, credentials = server.validate_authorization_request(*extract_params(request))
    except FatalClientError:
        raise HTTPBadRequest()
    except OAuth2Error as e:
        return HTTPFound(location=e.in_uri(e.redirect_uri))
    del credentials['request']
    return Response(render_form(request, scopes, credentials, bool(form.mobile)))
views.py 文件源码 项目:kinto-elasticsearch 作者: Kinto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def search_view(request, **kwargs):
    bucket_id = request.matchdict['bucket_id']
    collection_id = request.matchdict['collection_id']

    # Limit the number of results to return, based on existing Kinto settings.
    paginate_by = request.registry.settings.get("paginate_by")
    max_fetch_size = request.registry.settings["storage_max_fetch_size"]
    if paginate_by is None or paginate_by <= 0:
        paginate_by = max_fetch_size
    configured = min(paginate_by, max_fetch_size)
    # If the size is specified in query, ignore it if larger than setting.
    specified = None
    if "body" in kwargs:
        try:
            body = json.loads(kwargs["body"].decode("utf-8"))
            specified = body.get("size")
        except json.decoder.JSONDecodeError:
            pass
    if specified is None or specified > configured:
        kwargs.setdefault("size", configured)

    # Access indexer from views using registry.
    indexer = request.registry.indexer
    try:
        results = indexer.search(bucket_id, collection_id, **kwargs)

    except elasticsearch.NotFoundError as e:
        # If plugin was enabled after the creation of the collection.
        indexer.create_index(bucket_id, collection_id)
        results = indexer.search(bucket_id, collection_id, **kwargs)

    except elasticsearch.RequestError as e:
        # Malformed query.
        if isinstance(e.info["error"], dict):
            message = e.info["error"]["reason"]
            details = e.info["error"]["root_cause"][0]
        else:
            message = e.info["error"]
            details = None
        response = http_error(httpexceptions.HTTPBadRequest(),
                              errno=ERRORS.INVALID_PARAMETERS,
                              message=message,
                              details=details)
        raise response

    except elasticsearch.ElasticsearchException as e:
        # General failure.
        logger.exception("Index query failed.")
        results = {}

    return results
views.py 文件源码 项目:annotran 作者: BirkbeckCTP 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def add_report(request):
    """
    Add an abuse report to the database
    :param request: a request object
    :return: a redirect to the abuse reports page
    """
    if request.authenticated_userid is None:
        raise exc.HTTPNotFound()

    reporter = request.authenticated_user
    if reporter is None:
        raise exc.HTTPNotFound()

    public_language_id = request.matchdict["public_language_id"]
    page_url = urllib.unquote(urllib.unquote(request.matchdict["page_uri"]))
    public_group_id = request.matchdict['public_group_id']
    user_id = request.matchdict['user_id']

    page = annotran.pages.models.Page.get_by_uri(page_url)
    author = h.models.User.get_by_username(user_id)
    reporter = h.models.User.get_by_username(request.authenticated_user.username)
    group = h.groups.models.Group.get_by_pubid(public_group_id)
    language = annotran.languages.models.Language.get_by_public_language_id(public_language_id)

    if page is None or author is None or reporter is None or group is None or language is None:
        raise exc.HTTPNotFound()

    translation = annotran.translations.models.Translation.get_translation(page, language, group)

    if translation is None:
        raise exc.HTTPNotFound()

    report = annotran.reports.models.Report.get_report(translation, author, reporter)

    # if already in a database, it means it was reported previously
    if report:
        return exc.HTTPBadRequest()

    report = annotran.reports.models.Report(translation, author, reporter)
    request.db.add(report)
    request.db.flush()

    reports = request.route_url('admin_reports')

    body_text = u'Hello,\n\nA new abuse report has been filed. ' \
                u'Please see <a href="{0}">{0}</a>.\n\nAnnotran'.format(reports)

    annotran.mailer.send(request, subject=u'A new abuse report has been filed',
                         recipients=[annotran.views.Shared.support_address],
                         body=body_text)

    return {}
replica.py 文件源码 项目:devpi 作者: devpi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_changes(self):
        # this method is called from all replica servers
        # and either returns changelog entry content for {serial} or,
        # if it points to the "next" serial, will block and wait
        # until that serial is committed.  However, after
        # MAX_REPLICA_BLOCK_TIME, we return 202 Accepted to indicate
        # the replica should try again.  The latter has two benefits:
        # - nginx' timeout would otherwise return 504 (Gateway Timeout)
        # - if the replica is not waiting anymore we would otherwise
        #   never time out here, leading to more and more threads
        # if no commits happen.

        if not self.xom.is_master():
            raise HTTPForbidden("Replication protocol disabled")
        expected_uuid = self.request.headers.get(H_EXPECTED_MASTER_ID, None)
        master_uuid = self.xom.config.get_master_uuid()
        # we require the header but it is allowed to be empty
        # (during initialization)
        if expected_uuid is None:
            msg = "replica sent no %s header" % H_EXPECTED_MASTER_ID
            threadlog.error(msg)
            raise HTTPBadRequest(msg)

        if expected_uuid and expected_uuid != master_uuid:
            threadlog.error("expected %r as master_uuid, replica sent %r", master_uuid,
                      expected_uuid)
            raise HTTPBadRequest("expected %s as master_uuid, replica sent %s" %
                                 (master_uuid, expected_uuid))

        serial = self.request.matchdict["serial"]

        with self.update_replica_status(serial):
            keyfs = self.xom.keyfs
            if serial.lower() == "nop":
                raw_entry = b""
            else:
                try:
                    serial = int(serial)
                except ValueError:
                    raise HTTPNotFound("serial needs to be int")
                raw_entry = self._wait_for_entry(serial)

            devpi_serial = keyfs.get_current_serial()
            r = Response(body=raw_entry, status=200, headers={
                str("Content-Type"): str("application/octet-stream"),
                str("X-DEVPI-SERIAL"): str(devpi_serial),
            })
            return r
views.py 文件源码 项目:bouncer 作者: hypothesis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def goto_url(request):
    """
    View that takes the user to a URL with the annotation layer enabled.

    Optional configuration for the client may be specified via additional query
    params:

    "q" - Initial query for the filter input in the client.
    """
    settings = request.registry.settings
    url = request.params.get('url')

    if url is None:
        raise httpexceptions.HTTPBadRequest('"url" parameter is missing')

    if not _is_valid_http_url(url):
        raise httpexceptions.HTTPBadRequest(
            _('Sorry, but this service can only show annotations on '
              'valid HTTP or HTTPs URLs.'))

    # Remove any existing #fragment identifier from the URI before we
    # append our own.
    url = parse.urldefrag(url)[0]

    query = parse.quote(request.params.get('q', ''))

    via_url = '{via_base_url}/{url}#annotations:query:{query}'.format(
        via_base_url=settings['via_base_url'],
        url=url,
        query=query)

    extension_url = '{url}#annotations:query:{query}'.format(
        url=url, query=query)

    pretty_url = util.get_pretty_url(url)

    return {
        'data': json.dumps({
            'chromeExtensionId': settings['chrome_extension_id'],
            'viaUrl': via_url,
            'extensionUrl': extension_url,
        }),
        'pretty_url': pretty_url
    }
test_views.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def one_value_param(name):

    def wrap(func):

        def f(self):
            tests = func(self)

            # 0 values: should fail
            with self.subTest(test='0 values'):
                try:
                    PackagesSearcher({
                        name: []
                    })()
                except HTTPBadRequest as e:
                    if e.detail == Messages.no_values % name:
                        # Expected exception
                        pass
                    else:
                        raise e
                else:
                    raise AssertionError()

            # 2, 3 values: should fail
            for i in range(2, 4):
                with self.subTest(test='%s values' % i):
                    try:
                        PackagesSearcher({
                            name: ['hel'] * i
                        })()
                    except HTTPBadRequest as e:
                        if e.detail == Messages.too_many_values % (1, i):
                            # Expected exception
                            pass
                        else:
                            raise e
                    else:
                        raise AssertionError()

            for test_case in tests:
                with self.subTest(test=test_case):
                    value, expected = test_case
                    searcher = PackagesSearcher({
                        name: [value]
                    })
                    searcher()
                    packages = [x for x in self.db['packages'].find({})]
                    search_result = searcher.search(packages)
                    for num, doc in enumerate(search_result):
                        if '_id' in search_result[num]:
                            del search_result[num]['_id']
                    self.assertTrue(are_equal(search_result, expected))

        return f

    return wrap
views.py 文件源码 项目:TonsleyLEDManager 作者: JonnoFTW 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_group_plugins(request):
    # make sure the plugin id exists

    group_id = request.matchdict['group_id']
    can_modify_group(request, group_id)
    group = request.db_session.query(LedGroup).filter(LedGroup.id == group_id).first()
    if not group:
        raise exc.HTTPBadRequest("No such plugin")
    else:
        def to_null(val):
            if val == '':
                return None
            return val
        POST = {k: to_null(v) for k, v in request.POST.items()}

        time_from, time_to = sorted([request.POST['time_from'], request.POST['time_to']])
        fmt_24 = "%H%M"
        fmt_date = "%d/%m/%Y"

        if (not time_from and time_to) or (not time_to and time_from):
            raise exc.HTTPBadRequest('If you want a time range, please specify a time from and time to')
        if time_from:
            time_from = datetime.datetime.strptime(time_from, fmt_24)
            time_to = datetime.datetime.strptime(time_to, fmt_24)
            group.time_to = time_to
            group.time_from = time_from
        else:
            group.time_from = None
            group.time_to = None
        if POST['date_from']:
            date_from = datetime.datetime.strptime(POST['date_from'], fmt_date)
            group.date_from = date_from
        else:
            group.date_from = None
        if POST['days']:
            days = POST['days'][:7]
            if not re.match("[0|1]{7}", days):
                raise exc.HTTPBadRequest("Days must have 7 valid days")
            # print "Setting days", days
            if "1" in days:
                group.days_of_week = days
            else:
                group.days_of_week = None
        else:
            group.days_of_week = None
        if POST['repeats']:
            repeats = int(POST['repeats'])
            group.repeats = max(0, repeats)
        else:
            group.repeats = None
        if POST['enabled']:
            group.enabled = POST['enabled'] == 'true'
        log(request, "Updated scheduling for <a href='/group/{}'>{}</a>".format(group.id, group.name))
        return {'success': True}
notification.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _do_update_from_json(
                self, json, parse_def, ctx,
                duplicate_handling=None, object_importer=None):
        from ..auth.util import user_has_permission
        user_id = ctx.get_user_id()
        target_user_id = user_id
        user = ctx.get_instance_of_class(User)
        if user:
            target_user_id = user.id
        if self.user_id:
            if target_user_id != self.user_id:
                if not user_has_permission(self.discussion_id, user_id, P_ADMIN_DISC):
                    raise HTTPUnauthorized()
            # For now, do not allow changing user, it's way too complicated.
            if 'user' in json and User.get_database_id(json['user']) != self.user_id:
                raise HTTPBadRequest()
        else:
            json_user_id = json.get('user', None)
            if json_user_id is None:
                json_user_id = target_user_id
            else:
                json_user_id = User.get_database_id(json_user_id)
                if json_user_id != user_id and not user_has_permission(self.discussion_id, user_id, P_ADMIN_DISC):
                    raise HTTPUnauthorized()
            self.user_id = json_user_id
        if self.discussion_id:
            if 'discussion_id' in json and Discussion.get_database_id(json['discussion_id']) != self.discussion_id:
                raise HTTPBadRequest()
        else:
            discussion_id = json.get('discussion', None) or ctx.get_discussion_id()
            if discussion_id is None:
                raise HTTPBadRequest()
            self.discussion_id = Discussion.get_database_id(discussion_id)
        new_type = json.get('@type', self.type)
        if self.external_typename() != new_type:
            polymap = inspect(self.__class__).polymorphic_identity
            if new_type not in polymap:
                raise HTTPBadRequest()
            new_type = polymap[new_type].class_
            new_instance = self.change_class(new_type)
            return new_instance._do_update_from_json(
                json, parse_def, ctx,
                DuplicateHandling.USE_ORIGINAL, object_importer)
        creation_origin = json.get('creation_origin', "USER_REQUESTED")
        if creation_origin is not None:
            self.creation_origin = NotificationCreationOrigin.from_string(creation_origin)
        if json.get('parent_subscription', None) is not None:
            self.parent_subscription_id = self.get_database_id(json['parent_subscription'])
        status = json.get('status', None)
        if status:
            status = NotificationSubscriptionStatus.from_string(status)
            if status != self.status:
                self.status = status
                self.last_status_change_date = datetime.utcnow()
        return self
sqla.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _create_subobject_from_json(
            self, json, target_cls, parse_def,
            context, accessor_name, object_importer=None):
        instance = None
        target_type = json.get('@type', None)
        if target_type:
            new_target_cls = get_named_class(target_type)
            if new_target_cls:
                if target_cls is not None and \
                        not issubclass(new_target_cls, target_cls):
                    raise HTTPBadRequest(
                        "Type %s was assigned to %s.%s" % (
                            target_type, self.__class__.__name__,
                            accessor_name))
                target_cls = new_target_cls
        if not target_cls:
            # Not an instance
            return None
        target_id = json.get('@id', None)
        if target_id is not None:
            if isinstance(target_id, string_types):
                instance = self._json_is_known_instance(target_id, object_importer)
                if instance is not None and object_importer:
                    object_importer.associate(target_id, instance)
        if instance is not None:
            # Interesting that it works here and not upstream
            sub_context = instance.get_instance_context(context)
            log.info("Chaining context from %s -> %s" % (context, sub_context))
            # NOTE: Here we could tombstone the instance if tombstonable.
            instance = instance._do_update_from_json(
                json, parse_def, sub_context,
                DuplicateHandling.USE_ORIGINAL, object_importer)
            instance = instance.handle_duplication(
                json, parse_def, sub_context,
                DuplicateHandling.USE_ORIGINAL, object_importer)
        else:
            instance_ctx = target_cls._do_create_from_json(
                json, parse_def, context,
                DuplicateHandling.USE_ORIGINAL, object_importer)
            if instance_ctx is None:
                raise HTTPBadRequest(
                    "Could not find or create object %s" % (
                        dumps(json),))
            if instance_ctx._instance:
                context.on_new_instance(instance_ctx._instance)
        if (target_id is not None and object_importer and
                instance_ctx._instance is not None):
            object_importer.associate(target_id, instance_ctx._instance)
        return instance_ctx

    # If a duplicate is created, do we use the original? (Error otherwise)
sqla.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _assign_subobject_list(self, instances, accessor):
        # only known case yet is Langstring.entries
        if isinstance(accessor, RelationshipProperty):
            if not accessor.back_populates:
                # Try the brutal approach
                setattr(self, accessor.key, instances)
            else:
                from ..lib.history_mixin import TombstonableMixin
                current_instances = getattr(self, accessor.key)
                missing = set(instances) - set(current_instances)
                if missing:
                    # Maybe tombstones
                    missing = filter(
                        lambda a: not isinstance(a, TombstonableMixin) or
                        not a.is_tombstone, missing)
                assert not missing, "what's wrong with back_populates?"
                extra = set(current_instances) - set(instances)
                if extra:
                    remote_columns = list(accessor.remote_side)
                    if len(accessor.remote_side) > 1:
                        if issubclass(accessor.mapper.class_, TombstonableMixin):
                            remote_columns = filter(lambda c: c.name != 'tombstone_date', remote_columns)
                    assert len(remote_columns) == 1
                    remote = remote_columns[0]
                    if remote.nullable:
                        # TODO: check update permissions on that object.
                        for inst in missing:
                            setattr(inst, remote.key, None)
                    else:
                        for inst in extra:
                            if not inst.user_can(
                                    user_id, CrudPermissions.DELETE,
                                    permissions):
                                raise HTTPUnauthorized(
                                    "Cannot delete object %s", inst.uri())
                            else:
                                if isinstance(inst, TombstonableMixin):
                                    inst.is_tombstone = True
                                else:
                                    self.db.delete(inst)
        elif isinstance(accessor, property):
            # Note: Does not happen yet.
            property.fset(self, instances)
        elif isinstance(accessor, Column):
            raise HTTPBadRequest(
                "%s cannot have multiple values" % (accessor.key, ))
        elif isinstance(accessor, AssociationProxy):
            # Also never happens
            current_instances = accessor.__get__(self, self.__class__)
            missing = set(instances) - set(current_instances)
            extra =  set(current_instances) - set(instances)
            for inst in missing:
                accessor.add(inst)
            for inst in extra:
                accessor.remove(inst)
        else:
            assert False, "we should not get here"
search.py 文件源码 项目:snovault 作者: ENCODE-DCC 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def report(context, request):
    doc_types = request.params.getall('type')
    if len(doc_types) != 1:
        msg = 'Report view requires specifying a single type.'
        raise HTTPBadRequest(explanation=msg)

    # schemas for all types
    types = request.registry[TYPES]

    # Get the subtypes of the requested type
    try:
        sub_types = types[doc_types[0]].subtypes
    except KeyError:
        # Raise an error for an invalid type
        msg = "Invalid type: " + doc_types[0]
        raise HTTPBadRequest(explanation=msg)

    # Raise an error if the requested type has subtypes.
    if len(sub_types) > 1:
        msg = 'Report view requires a type with no child types.'
        raise HTTPBadRequest(explanation=msg)

    # Ignore large limits, which make `search` return a Response
    # -- UNLESS we're being embedded by the download_report view
    from_, size = get_pagination(request)
    if ('limit' in request.GET and request.__parent__ is None
            and (size is None or size > 1000)):
        del request.GET['limit']
    # Reuse search view
    res = search(context, request)

    # change @id, @type, and views
    res['views'][0] = {
        'href': res['@id'],
        'title': 'View results as list',
        'icon': 'list-alt',
    }
    search_base = normalize_query(request)
    res['@id'] = '/report/' + search_base
    # TODO add this back one day
    # res['download_tsv'] = request.route_path('report_download') + search_base
    res['title'] = 'Report'
    res['@type'] = ['Report']
    return res


问题


面经


文章

微信
公众号

扫码关注公众号