python类ParseError()的实例源码

xform_viewset.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _set_start_end_params(request, query):
    format_date_for_mongo = lambda x, datetime: datetime.strptime(
        x, '%y_%m_%d_%H_%M_%S').strftime('%Y-%m-%dT%H:%M:%S')

    # check for start and end params
    if 'start' in request.GET or 'end' in request.GET:
        query = json.loads(query) \
            if isinstance(query, six.string_types) else query
        query[SUBMISSION_TIME] = {}

        try:
            if request.GET.get('start'):
                query[SUBMISSION_TIME]['$gte'] = format_date_for_mongo(
                    request.GET['start'], datetime)

            if request.GET.get('end'):
                query[SUBMISSION_TIME]['$lte'] = format_date_for_mongo(
                    request.GET['end'], datetime)
        except ValueError:
            raise exceptions.ParseError(
                _("Dates must be in the format YY_MM_DD_hh_mm_ss")
            )
        else:
            query = json.dumps(query)

        return query
xform_viewset.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def update(self, request, pk, *args, **kwargs):
        if 'xls_file' in request.FILES or 'text_xls_form' in request.data:
            # A new XLSForm has been uploaded and will replace the existing
            # form
            existing_xform = get_object_or_404(XForm, pk=pk)
            # Behave like `onadata.apps.main.views.update_xform`: only allow
            # the update to proceed if the user is the owner
            owner = existing_xform.user
            if request.user.pk != owner.pk:
                raise exceptions.PermissionDenied(
                    detail=_("Only a form's owner can overwrite its contents"))
            survey = utils.publish_xlsform(request, owner, existing_xform)
            if not isinstance(survey, XForm):
                if isinstance(survey, dict) and 'text' in survey:
                    # Typical error text; pass it along
                    raise exceptions.ParseError(detail=survey['text'])
                else:
                    # Something odd; hopefully it can be coerced into a string
                    raise exceptions.ParseError(detail=survey)
        # Let the superclass handle updates to the other fields
        # noti = existing_xform.logs.create(source=request.user, type=7, title="Kobo form Updated",
        #                              organization=request.organization,
        #                             description="new kobo form {0} Updated by {1}".
        #                             format(existing_xform.title, request.user.username))
        # result = {}
        # result['description'] = noti.description
        # result['url'] = noti.get_absolute_url()
        # ChannelGroup("notify-0").send({"text":json.dumps(result)})
        # if noti.organization:
        #     ChannelGroup("notify-{}".format(noti.organization.id)).send({"text":json.dumps(result)})
        return super(XFormViewSet, self).update(request, pk, *args, **kwargs)
stats_serializer.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def to_representation(self, obj):
        if obj is None:
            return \
                super(SubmissionStatsInstanceSerializer, self).to_representation(obj)

        request = self.context.get('request')
        field = request.query_params.get('group')
        name = request.query_params.get('name', field)

        if field is None:
            raise exceptions.ParseError(_(u"Expecting `group` and `name`"
                                          u" query parameters."))

        try:
            data = get_form_submissions_grouped_by_field(
                obj, field, name)
        except ValueError as e:
            raise exceptions.ParseError(detail=e.message)
        else:
            if data:
                dd = obj.data_dictionary()
                element = dd.get_survey_element(field)

                if element and element.type in SELECT_FIELDS:
                    for record in data:
                        label = dd.get_choice_label(element, record[name])
                        record[name] = label
        return data
tag_list_serializer.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def to_internal_value(self, data):
        if type(data) is not list:
            raise ParseError("expected a list of data")

        return data
object_lookup_mixin.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_object(self):
        """
        Incase the lookup is on an object that has been hyperlinked
        then update the queryset filter appropriately
        """
        if self.kwargs.get(self.lookup_field, None) is None:
            raise ParseError(
                'Expected URL keyword argument `%s`.' % self.lookup_field
            )
        queryset = self.filter_queryset(self.get_queryset())

        filter_kwargs = {}
        serializer = self.get_serializer()
        lookup_field = self.lookup_field

        if self.lookup_field in serializer.get_fields():
            k = serializer.get_fields()[self.lookup_field]
            if isinstance(k, serializers.HyperlinkedRelatedField):
                lookup_field = '%s__%s' % (self.lookup_field, k.lookup_field)

        filter_kwargs[lookup_field] = self.kwargs[self.lookup_field]

        obj = get_object_or_404(queryset, **filter_kwargs)

        # May raise a permission denied
        self.check_object_permissions(self.request, obj)

        return obj
multi_lookup_mixin.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_object(self):
        queryset = self.filter_queryset(self.get_queryset())
        filter_kwargs = {}
        serializer = self.get_serializer()
        lookup_fields = getattr(self, 'lookup_fields', [])

        for field in lookup_fields:
            lookup_field = field

            if lookup_field in serializer.get_fields():
                k = serializer.get_fields()[lookup_field]

                if isinstance(k, serializers.HyperlinkedRelatedField):
                    if k.source:
                        lookup_field = k.source
                    lookup_field = '%s__%s' % (lookup_field, k.lookup_field)

            if self.kwargs.get(field, None) is None:
                raise ParseError(
                    'Expected URL keyword argument `%s`.' % field
                )
            filter_kwargs[lookup_field] = self.kwargs[field]

        obj = get_object_or_404(queryset,  **filter_kwargs)

        # May raise a permission denied
        self.check_object_permissions(self.request, obj)

        return obj
filters.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def filter_queryset(self, request, queryset, view):
        """
        Anonymous user has no object permissions, return queryset as it is.
        """
        user = request.user
        project_id = view.kwargs.get(view.lookup_field)

        if user.is_anonymous():
            return queryset.filter(Q(shared=True))

        if project_id:
            try:
                int(project_id)
            except ValueError:
                raise ParseError(
                    u"Invalid value for project_id '%s' must be a positive "
                    "integer." % project_id)

            # check if project is public and return it
            try:
                project = queryset.get(id=project_id)
            except ObjectDoesNotExist:
                raise Http404

            if project.shared:
                return queryset.filter(Q(id=project_id))

        return super(AnonUserProjectFilter, self)\
            .filter_queryset(request, queryset, view)
filters.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def filter_queryset(self, request, queryset, view):
        queryset = self._xform_filter_queryset(request, queryset, view,
                                               'instance__xform')
        instance_id = request.query_params.get('instance')
        if instance_id:
            try:
                int(instance_id)
            except ValueError:
                raise ParseError(
                    u"Invalid value for instance %s." % instance_id)
            instance = get_object_or_404(Instance, pk=instance_id)
            queryset = queryset.filter(instance=instance)

        return queryset
views.py 文件源码 项目:vialer-middleware 作者: VoIPGRID 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _serialize_data(self, data, serializer_class=None):
        """
        Function to serialize data with the serializer given in the
        serializer_class attribute. Also validates the data and responds with
        a HTTP_400_BAD_REQUEST when validation failed. Due to being an open
        api we do not want to give away the required fields and their
        requirements.

        Args:
            data(dict): Dictonary with that data that need to be serialized.
            serializer_class(Serializer): Class to use for serialization.

        Returns:
            dict: Dictionary with the validated data.

        Raises:
            NotImplementedError: When serializer_class attribute is not set.
            ParseError: When validation fails but because it's an open API we
                do not want to give away what failed. ParseError returns a
                HTTP_400_BAD_REQUEST.
        """
        if serializer_class is None:
            if self.serializer_class is None:
                raise NotImplementedError('serializer_class Attribute should be set')
        else:
            self.serializer_class = serializer_class

        serializer = self.serializer_class(data=data)
        if not serializer.is_valid(raise_exception=False):
            # Log errors.
            logger.info('BAD REQUEST! Serialization failed with following errors:\n\n{0}\n\nData:\n\n{1}'.format(
                serializer.errors,
                data,
            ))
            # This raises a bad request response.
            raise ParseError(detail=None)

        return serializer.validated_data
parsers.py 文件源码 项目:jianshu-api 作者: strugglingyouth 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse(self, stream, media_type=None, parser_context=None):
        """
        Parses the incoming bytestream as JSON and returns the resulting data.
        """
        parser_context = parser_context or {}
        encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)

        try:
            data = stream.read().decode(encoding)
            return json.loads(data)
        except ValueError as exc:
            raise ParseError('JSON parse error - %s' % six.text_type(exc))
parsers.py 文件源码 项目:munch-core 作者: crunchmail 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse(self, stream, media_type=None, parser_context=None):
        parser_context = parser_context or {}
        delimiter = parser_context.get('delimiter', ',')

        try:
            encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
            rows = unicode_csv_reader(universal_newlines(stream),
                                      delimiter=delimiter, charset=encoding)
            data = OrderedRows(next(rows))
            for row in rows:
                row_data = dict(zip(data.header, row))
                data.append(row_data)
            return data
        except Exception as exc:
            raise ParseError('CSV parse error - %s' % str(exc))
views.py 文件源码 项目:ChRIS_ultron_backEnd 作者: FNNDSC 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, request):
        raise ParseError('lol nice one')
viewsets.py 文件源码 项目:django-daiquiri 作者: aipescience 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_page(self):
        try:
            return int(self.request.GET.get('page', '1'))
        except ValueError:
            raise ParseError('page must be an integer')
viewsets.py 文件源码 项目:django-daiquiri 作者: aipescience 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_page_size(self):
        try:
            return int(self.request.GET.get('page_size', '10'))
        except ValueError:
            raise ParseError('page_size must be an integer')
parsers.py 文件源码 项目:rest_channels 作者: KhasanovBI 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse(self, data):
        try:
            return ujson.loads(data)
        except ValueError as exc:
            raise ParseError('JSON parse error - %s' % six.text_type(exc))
user_profile_viewset.py 文件源码 项目:FormShare 作者: qlands 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_object(self, queryset=None):
        """Lookup user profile by pk or username"""
        if self.kwargs.get(self.lookup_field, None) is None:
            raise ParseError(
                'Expected URL keyword argument `%s`.' % self.lookup_field
            )
        if queryset is None:
            queryset = self.filter_queryset(self.get_queryset())

        serializer = self.get_serializer()
        lookup_field = self.lookup_field

        if self.lookup_field in serializer.get_fields():
            k = serializer.get_fields()[self.lookup_field]
            if isinstance(k, serializers.HyperlinkedRelatedField):
                lookup_field = '%s__%s' % (self.lookup_field, k.lookup_field)

        lookup = self.kwargs[self.lookup_field]
        filter_kwargs = {lookup_field: lookup}

        try:
            pk = int(lookup)
        except (TypeError, ValueError):
            pass
        else:
            filter_kwargs = {'user__pk': pk}

        obj = get_object_or_404(queryset, **filter_kwargs)

        # May raise a permission denied
        self.check_object_permissions(self.request, obj)

        return obj
data_viewset.py 文件源码 项目:FormShare 作者: qlands 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def destroy(self, request, *args, **kwargs):
        self.object = self.get_object()

        if isinstance(self.object, XForm):
            raise ParseError(_(u"Data id not provided."))
        elif isinstance(self.object, Instance):

            if request.user.has_perm("delete_xform", self.object.xform):
                self.object.delete()
            else:
                raise PermissionDenied(_(u"You do not have delete "
                                         u"permissions."))

        return Response(status=status.HTTP_204_NO_CONTENT)
connect_viewset.py 文件源码 项目:FormShare 作者: qlands 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def expire(self, request, *args, **kwargs):
        try:
            TempToken.objects.get(user=request.user).delete()
        except TempToken.DoesNotExist:
            raise ParseError(_(u"Temporary token not found!"))

        return Response(status=status.HTTP_204_NO_CONTENT)
xform_viewset.py 文件源码 项目:FormShare 作者: qlands 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_export_type(export_type):
    if export_type in EXPORT_EXT.keys():
        export_type = EXPORT_EXT[export_type]
    else:
        raise exceptions.ParseError(
            _(u"'%(export_type)s' format not known or not implemented!" %
              {'export_type': export_type})
        )

    return export_type
xform_viewset.py 文件源码 项目:FormShare 作者: qlands 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _set_start_end_params(request, query):
    format_date_for_mongo = lambda x, datetime: datetime.strptime(
        x, '%y_%m_%d_%H_%M_%S').strftime('%Y-%m-%dT%H:%M:%S')

    # check for start and end params
    if 'start' in request.GET or 'end' in request.GET:
        query = json.loads(query) \
            if isinstance(query, six.string_types) else query
        query[SUBMISSION_TIME] = {}

        try:
            if request.GET.get('start'):
                query[SUBMISSION_TIME]['$gte'] = format_date_for_mongo(
                    request.GET['start'], datetime)

            if request.GET.get('end'):
                query[SUBMISSION_TIME]['$lte'] = format_date_for_mongo(
                    request.GET['end'], datetime)
        except ValueError:
            raise exceptions.ParseError(
                _("Dates must be in the format YY_MM_DD_hh_mm_ss")
            )
        else:
            query = json.dumps(query)

        return query


问题


面经


文章

微信
公众号

扫码关注公众号