def delete_scan_list(request: Request, token: str) -> Response:
"""Update an existing list."""
# TODO: Access control (Or is token sufficient)?
try:
scan_list = ScanList.objects.get(token=token)
# all related objects CASCADE automatically.
scan_list.delete()
return Response({
'type': 'success',
'message': 'ok',
})
except KeyError as e:
raise ParseError
except ScanList.DoesNotExist:
raise NotFound
# TODO: Why POST?
# TODO: Add a filter option to get_lists and get rid of this search method
python类ParseError()的实例源码
def authenticate(self, request):
if request.META['REQUEST_METHOD'] != 'POST':
return None
print 'Request data: {}'.format(request.data)
if 'HTTP_TOKEN' not in request.META:
raise exceptions.ParseError("Registration Token not present in the request.")
elif 'sampling_feature' not in request.data:
raise exceptions.ParseError("Sampling feature UUID not present in the request.")
# Get auth_token(uuid) from header, get registration object with auth_token, get the user from that registration, verify sampling_feature uuid is registered by this user, be happy.
token = request.META['HTTP_TOKEN']
registration = SiteRegistration.objects.filter(registration_token=token).first()
if not registration:
raise exceptions.PermissionDenied('Invalid Security Token')
# request needs to have the sampling feature uuid of the registration -
if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']:
raise exceptions.AuthenticationFailed(
'Site Identifier is not associated with this Token') # or other related exception
return None
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as a multipart encoded form,
and returns a DataAndFiles object.
`.data` will be a `QueryDict` containing all the form parameters.
`.files` will be a `QueryDict` containing all the form files.
"""
parser_context = parser_context or {}
request = parser_context['request']
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
meta = request.META.copy()
meta['CONTENT_TYPE'] = media_type
upload_handlers = request.upload_handlers
try:
parser = DjangoMultiPartParser(meta, stream, upload_handlers, encoding)
data, files = parser.parse()
return DataAndFiles(data, files)
except MultiPartParserError as exc:
raise ParseError('Multipart form parse error - %s' % six.text_type(exc))
def contains_payment(self, price, request_headers, **kwargs):
"""Validate the payment information received in the request headers.
Args:
price (int): The price the user must pay for the resource.
request_headers (dict): Headers sent by client with their request.
keyword args: Any other headers needed to verify payment.
Returns:
(bool): True if payment is valid,
False if no payment attached (402 initiation).
Raises:
ParseError: If request is malformed.
"""
for method in self.allowed_methods:
if method.should_redeem(request_headers):
try:
v = method.redeem_payment(price, request_headers, **kwargs)
except Exception as e:
raise ParseError(str(e))
return v
return False
def get_object(self):
"""Lookup user profile by pk or username"""
lookup = self.kwargs.get(self.lookup_field, None)
if lookup is None:
raise ParseError(
'Expected URL keyword argument `%s`.' % self.lookup_field
)
queryset = self.filter_queryset(self.get_queryset())
try:
pk = int(lookup)
except (TypeError, ValueError):
filter_kwargs = {'username': lookup}
else:
filter_kwargs = {'pk': pk}
# Return a 404 if the user does not exist
user = get_object_or_404(User, **filter_kwargs)
# Since the user does exist, create a matching profile if necessary
obj, created = queryset.get_or_create(user=user)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def get_object(self):
obj = super(DataViewSet, self).get_object()
pk_lookup, dataid_lookup = self.lookup_fields
pk = self.kwargs.get(pk_lookup)
dataid = self.kwargs.get(dataid_lookup)
if pk is not None and dataid is not None:
try:
int(dataid)
except ValueError:
raise ParseError(_(u"Invalid dataid %(dataid)s"
% {'dataid': dataid}))
obj = get_object_or_404(Instance, pk=dataid, xform__pk=pk)
return obj
def filter_queryset(self, queryset, view=None):
qs = super(DataViewSet, self).filter_queryset(queryset)
pk = self.kwargs.get(self.lookup_field)
tags = self.request.query_params.get('tags', None)
if tags and isinstance(tags, six.string_types):
tags = tags.split(',')
qs = qs.filter(tags__name__in=tags).distinct()
if pk:
try:
int(pk)
except ValueError:
if pk == self.public_data_endpoint:
qs = self._get_public_forms_queryset()
else:
raise ParseError(_(u"Invalid pk %(pk)s" % {'pk': pk}))
else:
qs = self._filtered_or_shared_qs(qs, pk)
return qs
def enketo(self, request, *args, **kwargs):
self.object = self.get_object()
data = {}
if isinstance(self.object, XForm):
raise ParseError(_(u"Data id not provided."))
elif(isinstance(self.object, Instance)):
if request.user.has_perm("change_xform", self.object.xform):
return_url = request.query_params.get('return_url')
if not return_url:
raise ParseError(_(u"return_url not provided."))
try:
data["url"] = get_enketo_edit_url(
request, self.object, return_url)
except EnketoError as e:
data['detail'] = "{}".format(e)
else:
raise PermissionDenied(_(u"You do not have edit permissions."))
return Response(data=data)
def to_representation(self, obj):
request = self.context.get('request')
if not isinstance(obj, XForm):
return super(DataListSerializer, self).to_representation(obj)
query_params = (request and request.query_params) or {}
query = {
ParsedInstance.USERFORM_ID:
u'%s_%s' % (obj.user.username, obj.id_string)
}
try:
query.update(json.loads(query_params.get('query', '{}')))
except ValueError:
raise ParseError(_("Invalid query: %(query)s"
% {'query': query_params.get('query')}))
query_kwargs = {
'query': json.dumps(query),
'fields': query_params.get('fields'),
'sort': query_params.get('sort')
}
cursor = ParsedInstance.query_mongo_minimal(**query_kwargs)
return list(cursor)
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as a multipart encoded form,
and returns a DataAndFiles object.
`.data` will be a `QueryDict` containing all the form parameters.
`.files` will be a `QueryDict` containing all the form files.
"""
parser_context = parser_context or {}
request = parser_context['request']
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
meta = request.META.copy()
meta['CONTENT_TYPE'] = media_type
upload_handlers = request.upload_handlers
try:
parser = DjangoMultiPartParser(meta, stream, upload_handlers, encoding)
data, files = parser.parse()
return DataAndFiles(data, files)
except MultiPartParserError as exc:
raise ParseError('Multipart form parse error - %s' % six.text_type(exc))
def perform_create(self, serializer):
"""
1. ?? ??? ??
2. ??? ??? ???? ?? ??
3. ????? ???? ??
"""
# print(self)
movie_pk = self.kwargs['pk']
movie = Movie.objects.get(pk=movie_pk)
author = MyUser.objects.get(pk=self.request.user.id)
a1 = Actor.objects.filter(movie=movie_pk)
a2 = Actor.objects.get(pk=self.request.data['actor'])
# ?? ??? ???? ??
if a2 not in [i for i in a1]:
raise ParseError('?? ??? ?? ? ????.')
# ?? ???
content = self.request.data['content']
r = ProfanitiesFilter()
clean_content = r.clean(content)
serializer.save(movie=movie, actor=a2, author=author, content=clean_content)
def perform_update(self, serializer):
"""
1. ?? ??? ??
2. ????? ???? ??
"""
famous_pk = self.kwargs['pk'] # get object ??? pk? url ??? ? movie_pk -> pk? ???
famous_line = FamousLine.objects.get(pk=famous_pk)
movie_pk = famous_line.movie.pk
actors = Actor.objects.filter(movie=movie_pk)
actor = Actor.objects.get(pk=self.request.data['actor'])
# ?? ???
try:
content = self.request.data['content']
r = ProfanitiesFilter()
clean_content = r.clean(content)
except:
clean_content = serializer.instance.content
# ?? ??? ???? ??
if actor not in [i for i in actors]:
raise ParseError('?? ??? ?? ? ????.')
serializer.save(content=clean_content)
def validate_data(self, stream_data):
template_valid_str = "Valid format: {template:{data:[{name: ,value: },...]}}"
if not isinstance(stream_data, dict):
detail = "Template is not a dictionary. "
detail += template_valid_str
raise ParseError(detail=detail)
json_data = {}
try:
for x in stream_data['template']['data']:
json_data[x['name']] = x['value']
except KeyError as e:
detail = "%s field required. " % e
detail += template_valid_str
raise ParseError(detail=detail)
except TypeError as e:
detail = "Invalid data provided. "
detail += template_valid_str
raise ParseError(detail=detail)
return json_data
def get(self, request, string, format=None):
"""
request: GET /biocircuit/ID
response: json api_circuit
"""
try:
digit_count = len([c for c in string if c in ('0', '1')])
if digit_count < 2:
raise ParseError(detail="At least two digits are required.")
expr = biocircuit.string2expr(string)
circuit = biocircuit.create_circuit(expr)
scores = biocircuit.circuit_score(circuit, biogate.d_gate)
response_dict = biocircuit.api_circuit(circuit, scores)
return Response(response_dict)
except BaseException as error:
# raise
response = {}
response["status"] = "failed"
response["detail"] = str(error)
return Response(response, status=status.HTTP_400_BAD_REQUEST)
def get_times(request):
"""Gets start and endtime from request
As we use no timezone in NAV, remove it from parsed timestamps
:param request: django.http.HttpRequest
"""
starttime = request.GET.get('starttime')
endtime = request.GET.get('endtime')
try:
if starttime:
starttime = iso8601.parse_date(starttime).replace(tzinfo=None)
if endtime:
endtime = iso8601.parse_date(endtime).replace(tzinfo=None)
except iso8601.ParseError:
raise Iso8601ParseError
return starttime, endtime
def get_object(self, queryset=None):
obj = super(DataViewSet, self).get_object(queryset)
pk_lookup, dataid_lookup = self.lookup_fields
pk = self.kwargs.get(pk_lookup)
dataid = self.kwargs.get(dataid_lookup)
if pk is not None and dataid is not None:
try:
int(dataid)
except ValueError:
raise ParseError(_(u"Invalid dataid %(dataid)s"
% {'dataid': dataid}))
obj = get_object_or_404(Instance, pk=dataid, xform__pk=pk)
return obj
def filter_queryset(self, queryset, view=None):
qs = super(DataViewSet, self).filter_queryset(queryset)
pk = self.kwargs.get(self.lookup_field)
tags = self.request.QUERY_PARAMS.get('tags', None)
if tags and isinstance(tags, six.string_types):
tags = tags.split(',')
qs = qs.filter(tags__name__in=tags).distinct()
if pk:
try:
int(pk)
except ValueError:
if pk == self.public_data_endpoint:
qs = self._get_public_forms_queryset()
else:
raise ParseError(_(u"Invalid pk %(pk)s" % {'pk': pk}))
else:
qs = self._filtered_or_shared_qs(qs, pk)
return qs
def enketo(self, request, *args, **kwargs):
self.object = self.get_object()
data = {}
if isinstance(self.object, XForm):
raise ParseError(_(u"Data id not provided."))
elif(isinstance(self.object, Instance)):
if request.user.has_perm("change_xform", self.object.xform):
return_url = request.QUERY_PARAMS.get('return_url')
if not return_url:
raise ParseError(_(u"return_url not provided."))
try:
data["url"] = get_enketo_edit_url(
request, self.object, return_url)
except EnketoError as e:
data['detail'] = "{}".format(e)
else:
raise PermissionDenied(_(u"You do not have edit permissions."))
return Response(data=data)
def to_native(self, obj):
if obj is None:
return super(StatsInstanceSerializer, self).to_native(obj)
request = self.context.get('request')
method = request.QUERY_PARAMS.get('method', None)
field = request.QUERY_PARAMS.get('field', None)
if field and field not in obj.data_dictionary().get_keys():
raise exceptions.ParseError(detail=_("Field not in XForm."))
stats_function = STATS_FUNCTIONS.get(method and method.lower(),
get_all_stats)
try:
data = stats_function(obj, field)
except ValueError as e:
raise exceptions.ParseError(detail=e.message)
return data
def get_queryset(self):
"""returns actions"""
start_date = self.request.GET.get('start')
end_date = self.request.GET.get('end')
if not start_date or not end_date:
raise ParseError("Period frame missing")
queryset = self.model.objects.all()
queryset = self._apply_in_action_type_lookup(queryset)
queryset = self._apply_in_charge_lookup(queryset)
try:
start_date = self._parse_date(start_date)
end_date = self._parse_date(end_date)
except ValueError:
raise ParseError("Invalid period frame")
start_datetime = datetime.combine(start_date, time.min)
end_datetime = datetime.combine(end_date, time.max)
if end_datetime < start_datetime:
return self.model.objects.none()
queryset = queryset.filter(
Q(planned_date__lte=start_datetime, end_datetime__gte=end_datetime) | # starts before, ends after period
Q(planned_date__gte=start_datetime, end_datetime__lte=end_datetime) | # starts and ends during period
Q(planned_date__lte=start_datetime, end_datetime__gte=start_datetime) | # starts before, ends during
Q(planned_date__lte=end_datetime, end_datetime__gte=end_datetime) | # starts during period, ends after
Q(
planned_date__gte=start_datetime,
end_datetime__isnull=True,
planned_date__lte=end_datetime
) # no end, starts during period
)
return queryset
def check_data(self, request):
"""check and prepare data"""
self.contacts = request.data.get("contacts", [])
action_type_id = request.data.get('type', None)
action_status_id = request.data.get('status', None)
if action_type_id and action_status_id:
try:
action_type = ActionType.objects.get(id=action_type_id)
action_status = ActionStatus.objects.get(id=action_status_id)
if action_status not in action_type.allowed_status.all():
raise ParseError(
"status {0} is not allowed for type {1}".format(action_type.name, action_status.name)
)
except (ValueError, ActionType.DoesNotExist, ActionStatus.DoesNotExist):
raise ParseError("Invalid parameters")
def save_object(self, serializer):
"""save object"""
contacts = []
for contact_id in self.contacts:
try:
contact = Contact.objects.get(id=contact_id)
contacts.append(contact)
except (ValueError, Contact.DoesNotExist):
raise ParseError("Invalid contacts")
obj = serializer.save()
obj.contacts.clear()
for contact in contacts:
obj.contacts.add(contact)
obj.save()
return obj
def update_scan_list(request: Request, scan_list_id: int) -> Response:
"""Update an existing list."""
try:
# TODO: Check if list is editable (and by current user)
scan_list = ScanList.objects.get(pk=scan_list_id,
token=request.data['token'])
scan_list.name = request.data['listname']
scan_list.description = request.data['description']
scan_list.private = request.data['isprivate']
# save tags
scan_list.save_tags(request.data['tags'])
# save columns
scan_list.save_columns(request.data['columns'])
scan_list.save()
return Response({
'type': 'success',
'message': 'ok',
})
except KeyError as e:
raise ParseError
except ScanList.DoesNotExist:
raise NotFound
def _extract_date(self):
"""
Extract date from request.
In detail route extract it from pk and it list
from query params.
"""
pk = self.request.parser_context['kwargs'].get('pk')
# detail case
if pk is not None:
try:
return datetime.datetime.strptime(
pk.split('_')[1], '%Y-%m-%d'
)
except (ValueError, TypeError, IndexError):
raise exceptions.NotFound()
# list case
query_params = self.request.query_params
try:
return datetime.datetime.strptime(
query_params.get('date'), '%Y-%m-%d'
).date()
except ValueError:
raise exceptions.ParseError(_('Date is invalid'))
except TypeError:
if query_params.get('last_reported_date', '0') == '0':
raise exceptions.ParseError(_('Date filter needs to be set'))
return None
def _extract_date(self):
"""
Extract date from request.
In detail route extract it from pk and it list
from query params.
"""
pk = self.request.parser_context['kwargs'].get('pk')
# detail case
if pk is not None:
try:
return datetime.datetime.strptime(
pk.split('_')[2], '%Y-%m-%d'
)
except (ValueError, TypeError, IndexError):
raise exceptions.NotFound()
# list case
try:
return datetime.datetime.strptime(
self.request.query_params.get('date'),
'%Y-%m-%d'
).date()
except ValueError:
raise exceptions.ParseError(_('Date is invalid'))
except TypeError:
raise exceptions.ParseError(_('Date filter needs to be set'))
def _extract_user(self):
"""
Extract user from request.
In detail route extract it from pk and it list
from query params.
"""
pk = self.request.parser_context['kwargs'].get('pk')
# detail case
if pk is not None:
try:
user_id = int(pk.split('_')[0])
# avoid query if user is self
if self.request.user.id == user_id:
return self.request.user
return get_user_model().objects.get(pk=pk.split('_')[0])
except (ValueError, get_user_model().DoesNotExist):
raise exceptions.NotFound()
# list case
try:
user_id = self.request.query_params.get('user')
if user_id is None:
raise exceptions.ParseError(_('User filter needs to be set'))
# avoid query if user is self
if self.request.user.id == int(user_id):
return self.request.user
return get_user_model().objects.get(pk=user_id)
except (ValueError, get_user_model().DoesNotExist):
raise exceptions.ParseError(_('User is invalid'))
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as JSON and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
try:
data = stream.read().decode(encoding)
return json.loads(data)
except ValueError as exc:
raise ParseError('JSON parse error - %s' % six.text_type(exc))
def get_queryset(self):
queryset = Event.objects.get_online()
since = self.request.query_params.get('since', None)
until = self.request.query_params.get('until', None)
if since is None and until is None:
# We return event for the next two days
since = datetime.datetime.now().strftime('%Y-%m-%d')
until = (datetime.datetime.now() + datetime.timedelta(days=7)).strftime('%Y-%m-%d')
if since is not None and until is None:
try:
since_date = datetime.datetime.strptime(since, '%Y-%m-%d')
queryset = queryset.filter(
(Q(begins_at__lte=since_date) & Q(begins_at__gte=since_date)) |
(Q(begins_at__gte=since_date))
)
except ValueError:
raise ParseError("Bad format for since parameter. Accepted format : %Y-%m-%d.")
elif since is None and until is not None:
try:
until_date = datetime.datetime.strptime(until, '%Y-%m-%d')
queryset = queryset.filter(
(Q(begins_at__lte=until_date) & Q(ends_at__gte=until_date)) |
(Q(ends_at__lte=until_date))
)
except ValueError:
raise ParseError("Bad format for until parameter. Accepted format : %Y-%m-%d.")
else:
try:
since_date = datetime.datetime.strptime(since, '%Y-%m-%d')
until_date = datetime.datetime.strptime(until, '%Y-%m-%d')
queryset = queryset.filter(
(Q(begins_at__gte=since_date) & Q(begins_at__lte=until_date)) |
(Q(ends_at__gte=since_date) & Q(ends_at__lte=until_date)) |
(Q(begins_at__lte=since_date) & Q(ends_at__gte=until_date))
)
except ValueError:
raise ParseError("Bad format for since/until parameters. Accepted format : %Y-%m-%d.")
return queryset
def destroy(self, request, *args, **kwargs):
self.object = self.get_object()
if isinstance(self.object, XForm):
raise ParseError(_(u"Data id not provided."))
elif isinstance(self.object, Instance):
if request.user.has_perm("delete_xform", self.object.xform):
self.object.delete()
else:
raise PermissionDenied(_(u"You do not have delete "
u"permissions."))
return Response(status=status.HTTP_204_NO_CONTENT)
def _get_export_type(export_type):
if export_type in EXPORT_EXT.keys():
export_type = EXPORT_EXT[export_type]
else:
raise exceptions.ParseError(
_(u"'%(export_type)s' format not known or not implemented!" %
{'export_type': export_type})
)
return export_type