def _set_start_end_params(request, query):
format_date_for_mongo = lambda x, datetime: datetime.strptime(
x, '%y_%m_%d_%H_%M_%S').strftime('%Y-%m-%dT%H:%M:%S')
# check for start and end params
if 'start' in request.GET or 'end' in request.GET:
query = json.loads(query) \
if isinstance(query, six.string_types) else query
query[SUBMISSION_TIME] = {}
try:
if request.GET.get('start'):
query[SUBMISSION_TIME]['$gte'] = format_date_for_mongo(
request.GET['start'], datetime)
if request.GET.get('end'):
query[SUBMISSION_TIME]['$lte'] = format_date_for_mongo(
request.GET['end'], datetime)
except ValueError:
raise exceptions.ParseError(
_("Dates must be in the format YY_MM_DD_hh_mm_ss")
)
else:
query = json.dumps(query)
return query
python类ParseError()的实例源码
def update(self, request, pk, *args, **kwargs):
if 'xls_file' in request.FILES or 'text_xls_form' in request.data:
# A new XLSForm has been uploaded and will replace the existing
# form
existing_xform = get_object_or_404(XForm, pk=pk)
# Behave like `onadata.apps.main.views.update_xform`: only allow
# the update to proceed if the user is the owner
owner = existing_xform.user
if request.user.pk != owner.pk:
raise exceptions.PermissionDenied(
detail=_("Only a form's owner can overwrite its contents"))
survey = utils.publish_xlsform(request, owner, existing_xform)
if not isinstance(survey, XForm):
if isinstance(survey, dict) and 'text' in survey:
# Typical error text; pass it along
raise exceptions.ParseError(detail=survey['text'])
else:
# Something odd; hopefully it can be coerced into a string
raise exceptions.ParseError(detail=survey)
# Let the superclass handle updates to the other fields
# noti = existing_xform.logs.create(source=request.user, type=7, title="Kobo form Updated",
# organization=request.organization,
# description="new kobo form {0} Updated by {1}".
# format(existing_xform.title, request.user.username))
# result = {}
# result['description'] = noti.description
# result['url'] = noti.get_absolute_url()
# ChannelGroup("notify-0").send({"text":json.dumps(result)})
# if noti.organization:
# ChannelGroup("notify-{}".format(noti.organization.id)).send({"text":json.dumps(result)})
return super(XFormViewSet, self).update(request, pk, *args, **kwargs)
def to_representation(self, obj):
if obj is None:
return \
super(SubmissionStatsInstanceSerializer, self).to_representation(obj)
request = self.context.get('request')
field = request.query_params.get('group')
name = request.query_params.get('name', field)
if field is None:
raise exceptions.ParseError(_(u"Expecting `group` and `name`"
u" query parameters."))
try:
data = get_form_submissions_grouped_by_field(
obj, field, name)
except ValueError as e:
raise exceptions.ParseError(detail=e.message)
else:
if data:
dd = obj.data_dictionary()
element = dd.get_survey_element(field)
if element and element.type in SELECT_FIELDS:
for record in data:
label = dd.get_choice_label(element, record[name])
record[name] = label
return data
def to_internal_value(self, data):
if type(data) is not list:
raise ParseError("expected a list of data")
return data
def get_object(self):
"""
Incase the lookup is on an object that has been hyperlinked
then update the queryset filter appropriately
"""
if self.kwargs.get(self.lookup_field, None) is None:
raise ParseError(
'Expected URL keyword argument `%s`.' % self.lookup_field
)
queryset = self.filter_queryset(self.get_queryset())
filter_kwargs = {}
serializer = self.get_serializer()
lookup_field = self.lookup_field
if self.lookup_field in serializer.get_fields():
k = serializer.get_fields()[self.lookup_field]
if isinstance(k, serializers.HyperlinkedRelatedField):
lookup_field = '%s__%s' % (self.lookup_field, k.lookup_field)
filter_kwargs[lookup_field] = self.kwargs[self.lookup_field]
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
filter_kwargs = {}
serializer = self.get_serializer()
lookup_fields = getattr(self, 'lookup_fields', [])
for field in lookup_fields:
lookup_field = field
if lookup_field in serializer.get_fields():
k = serializer.get_fields()[lookup_field]
if isinstance(k, serializers.HyperlinkedRelatedField):
if k.source:
lookup_field = k.source
lookup_field = '%s__%s' % (lookup_field, k.lookup_field)
if self.kwargs.get(field, None) is None:
raise ParseError(
'Expected URL keyword argument `%s`.' % field
)
filter_kwargs[lookup_field] = self.kwargs[field]
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def filter_queryset(self, request, queryset, view):
"""
Anonymous user has no object permissions, return queryset as it is.
"""
user = request.user
project_id = view.kwargs.get(view.lookup_field)
if user.is_anonymous():
return queryset.filter(Q(shared=True))
if project_id:
try:
int(project_id)
except ValueError:
raise ParseError(
u"Invalid value for project_id '%s' must be a positive "
"integer." % project_id)
# check if project is public and return it
try:
project = queryset.get(id=project_id)
except ObjectDoesNotExist:
raise Http404
if project.shared:
return queryset.filter(Q(id=project_id))
return super(AnonUserProjectFilter, self)\
.filter_queryset(request, queryset, view)
def filter_queryset(self, request, queryset, view):
queryset = self._xform_filter_queryset(request, queryset, view,
'instance__xform')
instance_id = request.query_params.get('instance')
if instance_id:
try:
int(instance_id)
except ValueError:
raise ParseError(
u"Invalid value for instance %s." % instance_id)
instance = get_object_or_404(Instance, pk=instance_id)
queryset = queryset.filter(instance=instance)
return queryset
def _serialize_data(self, data, serializer_class=None):
"""
Function to serialize data with the serializer given in the
serializer_class attribute. Also validates the data and responds with
a HTTP_400_BAD_REQUEST when validation failed. Due to being an open
api we do not want to give away the required fields and their
requirements.
Args:
data(dict): Dictonary with that data that need to be serialized.
serializer_class(Serializer): Class to use for serialization.
Returns:
dict: Dictionary with the validated data.
Raises:
NotImplementedError: When serializer_class attribute is not set.
ParseError: When validation fails but because it's an open API we
do not want to give away what failed. ParseError returns a
HTTP_400_BAD_REQUEST.
"""
if serializer_class is None:
if self.serializer_class is None:
raise NotImplementedError('serializer_class Attribute should be set')
else:
self.serializer_class = serializer_class
serializer = self.serializer_class(data=data)
if not serializer.is_valid(raise_exception=False):
# Log errors.
logger.info('BAD REQUEST! Serialization failed with following errors:\n\n{0}\n\nData:\n\n{1}'.format(
serializer.errors,
data,
))
# This raises a bad request response.
raise ParseError(detail=None)
return serializer.validated_data
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as JSON and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
try:
data = stream.read().decode(encoding)
return json.loads(data)
except ValueError as exc:
raise ParseError('JSON parse error - %s' % six.text_type(exc))
def parse(self, stream, media_type=None, parser_context=None):
parser_context = parser_context or {}
delimiter = parser_context.get('delimiter', ',')
try:
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
rows = unicode_csv_reader(universal_newlines(stream),
delimiter=delimiter, charset=encoding)
data = OrderedRows(next(rows))
for row in rows:
row_data = dict(zip(data.header, row))
data.append(row_data)
return data
except Exception as exc:
raise ParseError('CSV parse error - %s' % str(exc))
def get(self, request):
raise ParseError('lol nice one')
def _get_page(self):
try:
return int(self.request.GET.get('page', '1'))
except ValueError:
raise ParseError('page must be an integer')
def _get_page_size(self):
try:
return int(self.request.GET.get('page_size', '10'))
except ValueError:
raise ParseError('page_size must be an integer')
def parse(self, data):
try:
return ujson.loads(data)
except ValueError as exc:
raise ParseError('JSON parse error - %s' % six.text_type(exc))
def get_object(self, queryset=None):
"""Lookup user profile by pk or username"""
if self.kwargs.get(self.lookup_field, None) is None:
raise ParseError(
'Expected URL keyword argument `%s`.' % self.lookup_field
)
if queryset is None:
queryset = self.filter_queryset(self.get_queryset())
serializer = self.get_serializer()
lookup_field = self.lookup_field
if self.lookup_field in serializer.get_fields():
k = serializer.get_fields()[self.lookup_field]
if isinstance(k, serializers.HyperlinkedRelatedField):
lookup_field = '%s__%s' % (self.lookup_field, k.lookup_field)
lookup = self.kwargs[self.lookup_field]
filter_kwargs = {lookup_field: lookup}
try:
pk = int(lookup)
except (TypeError, ValueError):
pass
else:
filter_kwargs = {'user__pk': pk}
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def destroy(self, request, *args, **kwargs):
self.object = self.get_object()
if isinstance(self.object, XForm):
raise ParseError(_(u"Data id not provided."))
elif isinstance(self.object, Instance):
if request.user.has_perm("delete_xform", self.object.xform):
self.object.delete()
else:
raise PermissionDenied(_(u"You do not have delete "
u"permissions."))
return Response(status=status.HTTP_204_NO_CONTENT)
def expire(self, request, *args, **kwargs):
try:
TempToken.objects.get(user=request.user).delete()
except TempToken.DoesNotExist:
raise ParseError(_(u"Temporary token not found!"))
return Response(status=status.HTTP_204_NO_CONTENT)
def _get_export_type(export_type):
if export_type in EXPORT_EXT.keys():
export_type = EXPORT_EXT[export_type]
else:
raise exceptions.ParseError(
_(u"'%(export_type)s' format not known or not implemented!" %
{'export_type': export_type})
)
return export_type
def _set_start_end_params(request, query):
format_date_for_mongo = lambda x, datetime: datetime.strptime(
x, '%y_%m_%d_%H_%M_%S').strftime('%Y-%m-%dT%H:%M:%S')
# check for start and end params
if 'start' in request.GET or 'end' in request.GET:
query = json.loads(query) \
if isinstance(query, six.string_types) else query
query[SUBMISSION_TIME] = {}
try:
if request.GET.get('start'):
query[SUBMISSION_TIME]['$gte'] = format_date_for_mongo(
request.GET['start'], datetime)
if request.GET.get('end'):
query[SUBMISSION_TIME]['$lte'] = format_date_for_mongo(
request.GET['end'], datetime)
except ValueError:
raise exceptions.ParseError(
_("Dates must be in the format YY_MM_DD_hh_mm_ss")
)
else:
query = json.dumps(query)
return query