def suggest(request):
'''
Suggest gene names based on the input text
'''
# suggestions are taken from a gene annotations tileset
tileset_uuid = request.GET['d']
text = request.GET['ac']
try:
tileset = tm.Tileset.objects.get(uuid=tileset_uuid)
except ObjectDoesNotExist:
raise rfe.NotFound('Suggestion source file not found')
result_dict = tsu.get_gene_suggestions(
tut.get_datapath(tileset.datafile.url), text
)
return JsonResponse(result_dict, safe=False)
python类NotFound()的实例源码
def delete_scan_list(request: Request, token: str) -> Response:
"""Update an existing list."""
# TODO: Access control (Or is token sufficient)?
try:
scan_list = ScanList.objects.get(token=token)
# all related objects CASCADE automatically.
scan_list.delete()
return Response({
'type': 'success',
'message': 'ok',
})
except KeyError as e:
raise ParseError
except ScanList.DoesNotExist:
raise NotFound
# TODO: Why POST?
# TODO: Add a filter option to get_lists and get rid of this search method
def create(self, validated_data):
company = self.context['request'].user.company
log_id = self.context.get('view').kwargs.get('log_id')
recipient = validated_data.get('recipient')
try:
log = NotificationLog.objects.get(notification__company=company,
id=log_id)
except NotificationLog.DoesNotExist:
raise exceptions.NotFound()
try:
log.trigger(recipient=recipient)
except Exception as exc:
raise serializers.ValidationError(
{"non_field_errors": ["Internal server error."]})
return log
def get_build(request, package_name, build_number, version=None, format=None):
account = request.user
package = get_object_or_404(Package, name=package_name, account=account)
try:
last_build = package.builds.filter(version=version).get(build_number=build_number)
except Build.DoesNotExist:
raise exceptions.NotFound("Couldn't find a build for this package name")
# Serialize the new build
serializer = BuildSerializer(last_build)
response_data = serializer.data.copy()
response_data.update(last_build.extra)
return Response(response_data)
def create(self, request, *args, **kwargs):
"""
1? post ?? ??? ??, 2? post ?? ??? ??
"""
try:
famous_line = FamousLine.objects.get(pk=kwargs['pk'])
except:
raise NotFound('???? ???? ????.')
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# ?? ??? ??? ??
famous_like_exist = FamousLike.objects.filter(user=request.user, famous_line=famous_line)
if famous_like_exist.exists():
famous_like_exist.delete()
return Response(serializer.errors, status=status.HTTP_306_RESERVED)
serializer.save(famous_line=famous_line, user=request.user)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def create(self, request, *args, **kwargs):
try:
comment = Comment.objects.get(pk=kwargs['pk'])
except:
raise NotFound('??? ???? ????.')
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# ?? ?? ? ??? ??
comment_like_exist = CommentLike.objects.filter(user=request.user, comment=comment)
if comment_like_exist.exists():
comment_like_exist.delete()
return Response(serializer.errors, status=status.HTTP_306_RESERVED)
serializer.save(comment=comment, user=request.user)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def experiment_by_id(request, exp_id):
try:
dataset = DataSet.objects.visible().get(id=exp_id)
except DataSet.DoesNotExist:
raise NotFound('No experiment with given id found.')
data = {
'id': dataset.id,
'name': dataset.name,
'description': '', # TODO
'authors': [],
'populations': dataset.get_populations(),
'subgroups': dataset.get_subgroups(),
'metrics': dataset.get_metrics(),
}
return Response(data)
def modify_certificates(self, request, pk=None):
calendar = self.get_object()
certificate_id = request.query_params['cert_id']
try:
certificate = Certificate.objects.get(id=certificate_id)
except Certificate.DoesNotExist:
raise NotFound('Certificate "%s" not found.' % certificate_id)
if request.method == 'POST':
# check if already there, and if so, raise 409
if calendar.certificates.filter(id=certificate.id).exists():
raise ContentError('Certificate "%s" already selected.' % certificate_id)
calendar.certificates.add(certificate)
elif request.method == 'DELETE':
if not calendar.certificates.filter(id=certificate.id).exists():
raise ContentError('Certificate "%s" not in this calendar.' % course_id)
calendar.certificates.remove(certificate)
genie.clear_cached_recommendations(calendar.profile_id, calendar.pk)
return Response({'success': True})
def post(self, request, username=None):
follower = self.request.user.profile
try:
followee = Profile.objects.get(user__username=username)
except Profile.DoesNotExist:
raise NotFound('A profile with this username was not found.')
if follower.pk is followee.pk:
raise serializers.ValidationError('You can not follow yourself.')
follower.follow(followee)
serializer = self.serializer_class(followee, context={
'request': request
})
return Response(serializer.data, status=status.HTTP_201_CREATED)
def update(self, request, slug):
serializer_context = {'request': request}
try:
serializer_instance = self.queryset.get(slug=slug)
except Article.DoesNotExist:
raise NotFound('An article with this slug does not exist.')
serializer_data = request.data.get('article', {})
serializer = self.serializer_class(
serializer_instance,
context=serializer_context,
data=serializer_data,
partial=True
)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
def post(self, request, join_id):
try:
game = Game.objects.get(join_id=join_id)
except Game.DoesNotExist:
raise NotFound("Invalid join_id",404)
if game.game_state != 'starting':
raise ValidationError("You can no longer join this game")
data = request.data
enforce_required_params(['name'],data)
player = Player.objects.create(name=data['name'], game=game)
if game.num_players == game.players.count():
game.game_state = 'pick_wc'
game.save()
game.start_new_round()
return Response({
'player':player.get_dict(show_private=True),
'game':game.get_dict()})
def history(self, request, data, error_message, filter_lookup, order_function, *args, **kwargs):
user_id = request.channel_session.get('user')
if user_id is not None:
composition_id = int(kwargs.get('composition_id'))
serializer = DiffHistorySerializer(
data=data,
context={'composition_id': composition_id}
)
serializer.is_valid(raise_exception=True)
diff_version_id = serializer.data['diff_composition_version']
diff_version = getattr(DiffCompositionVersion.objects
.filter(
composition_id=composition_id,
**{filter_lookup: diff_version_id}
), order_function)()
if diff_version is None:
raise NotFound(error_message)
self.route_send(
request.reply_channel,
DiffCompositionVersionSerializer(diff_version).data,
status.HTTP_200_OK
)
else:
raise PermissionDenied
def retrieve(self, request, key=None, *args, **kwargs):
show_redirects = 'show_redirects' in request.GET
detailed = 'detailed' in request.GET
geometry = 'geometry' in request.GET
location = self.get_object()
if location is None:
raise NotFound
if isinstance(location, LocationRedirect):
if not show_redirects:
return redirect('../' + str(location.target.slug)) # todo: why does redirect/reverse not work here?
return Response(location.serialize(include_type=True, detailed=detailed,
geometry=geometry, simple_geometry=True))
def list(self, request, *args, **kwargs):
# get database and table from the querystring
database_name = self.request.GET.get('database')
table_name = self.request.GET.get('table')
column_names = self.request.GET.getlist('column')
# get the columns using the utils function
columns = get_columns(self.request.user, database_name, table_name)
if columns:
if column_names:
columns = [column for column in columns if column['name'] in column_names]
return Response(ColumnSerializer(columns, many=True).data)
# if nothing worked, return 404
raise NotFound()
def retrieve_organization_scan_config(request, pk=None):
"""
Retrieve the ScanConfig associated with the referenced organization.
:param request: The request received by this API handler.
:param pk: The primary key of the organization to retrieve the ScanConfig for.
:return: A response containing the ScanConfig associated with the given Organization.
"""
if request.user.is_superuser:
query = rest.models.Organization.objects
else:
query = rest.models.Organization.objects.filter(
auth_groups__users=request.user,
auth_groups__name="org_read",
)
try:
organization = query.get(pk=pk)
except rest.models.Organization.DoesNotExist:
raise NotFound()
if not organization.scan_config:
raise NotFound()
else:
return_data = rest.serializers.ScanConfigSerializer(organization.scan_config)
return Response(return_data.data)
def check_scan_config_validity(request, pk=None):
"""
Check to see if a given ScanConfig is valid for an order to be placed with
it.
:param request: The request that invoked this handler.
:param pk: The primary key of the ScanConfig to check.
:return: An HTTP response.
"""
try:
if request.user.is_superuser:
query = rest.models.ScanConfig.objects
else:
query = rest.models.ScanConfig.objects\
.filter(
Q(user=request.user) | Q(is_default=True)
)
scan_config = query.get(pk=pk)
except rest.models.ScanConfig.DoesNotExist:
raise NotFound()
return rest.responses.WsScanConfigValidityResponse(scan_config=scan_config, status=200)
def try_to_get(obj, pk):
"""
This method can be used to get a model instance based on it's primary key.
If the instance does not exists, it will automatically throws a 404 error.
Method version : uses the queryset provided in the subclass definition
Static version : takes the queryset to use as its first argument.
"""
if isinstance(obj, SigmaViewSet):
return SigmaViewSet.try_to_get(obj.__class__.queryset, pk)
try:
instance = obj.get(pk=pk)
except obj.model.DoesNotExist:
raise NotFound()
return instance
def get_object(self):
zone = self.zone
record_id = self.kwargs['record_id']
for record in zone.records:
if record.id == record_id:
return record
raise NotFound(detail='Record not found.')
def get(self, request, format=None, *args, **kwargs):
query = """
SELECT row_to_json(fc)
FROM (
SELECT 'FeatureCollection' AS type,
array_to_json(array_agg(f)) AS features
FROM (SELECT 'Feature' AS type,
ST_AsGeoJSON(g.geom_simple)::json AS geometry,
g.uuid AS id,
row_to_json((SELECT p FROM (
SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p))
AS properties
FROM pfb_analysis_neighborhood AS g WHERE g.uuid = %s) AS f) AS fc;
"""
# get the neighborhood ID from the request
uuid = kwargs.get('neighborhood', '')
if not uuid:
return Response({})
try:
with connection.cursor() as cursor:
cursor.execute(query, [uuid])
json = cursor.fetchone()
if not json or not len(json):
return Response({})
except DataError:
raise NotFound(detail='{} is not a valid neighborhood UUID.'.format(uuid))
return Response(json[0])
def scan_result(request: Request, scan_id: int) -> Response:
"""Get a scan result by its id."""
try:
scan = Scan.objects.get(pk=scan_id)
return Response(scan.result.result)
except Scan.DoesNotExist:
raise NotFound
except ScanResult.DoesNotExist:
raise NotFound('scan not finished')
def not_found(request):
raise NotFound('NotFound')
def get_game(self):
try:
return Game.objects.get(pk=self.kwargs['game_pk'])
except Game.DoesNotExist:
raise NotFound('game not found')
def get_round(self):
try:
game = self.get_game()
round_number = self.kwargs['round_number']
return Round.objects.get(game=game, number=round_number)
except Game.DoesNotExist:
raise NotFound('game not found')
except Round.DoesNotExist:
raise NotFound('round not found')
def get_object(self):
game_pk = self.kwargs['game_pk']
round_number = self.kwargs['round_number']
try:
return Play.objects.get(game=game_pk, round__number=round_number)
except Play.DoesNotExist:
raise NotFound('play not found')
def get_object(self):
game_pk = self.kwargs['game_pk']
number = self.kwargs['player_number']
try:
return get_object_or_404(Player, game=game_pk, number=number)
except Player.DoesNotExist:
raise NotFound('player not found')
def has_permission(self, request, view):
try:
game_pk = view.kwargs['game_pk']
return Game.objects.get(pk=game_pk, players__user=request.user)
except Game.DoesNotExist:
raise NotFound('game not found')
def _extract_date(self):
"""
Extract date from request.
In detail route extract it from pk and it list
from query params.
"""
pk = self.request.parser_context['kwargs'].get('pk')
# detail case
if pk is not None:
try:
return datetime.datetime.strptime(
pk.split('_')[1], '%Y-%m-%d'
)
except (ValueError, TypeError, IndexError):
raise exceptions.NotFound()
# list case
query_params = self.request.query_params
try:
return datetime.datetime.strptime(
query_params.get('date'), '%Y-%m-%d'
).date()
except ValueError:
raise exceptions.ParseError(_('Date is invalid'))
except TypeError:
if query_params.get('last_reported_date', '0') == '0':
raise exceptions.ParseError(_('Date filter needs to be set'))
return None
def _extract_date(self):
"""
Extract date from request.
In detail route extract it from pk and it list
from query params.
"""
pk = self.request.parser_context['kwargs'].get('pk')
# detail case
if pk is not None:
try:
return datetime.datetime.strptime(
pk.split('_')[2], '%Y-%m-%d'
)
except (ValueError, TypeError, IndexError):
raise exceptions.NotFound()
# list case
try:
return datetime.datetime.strptime(
self.request.query_params.get('date'),
'%Y-%m-%d'
).date()
except ValueError:
raise exceptions.ParseError(_('Date is invalid'))
except TypeError:
raise exceptions.ParseError(_('Date filter needs to be set'))
def _extract_user(self):
"""
Extract user from request.
In detail route extract it from pk and it list
from query params.
"""
pk = self.request.parser_context['kwargs'].get('pk')
# detail case
if pk is not None:
try:
user_id = int(pk.split('_')[0])
# avoid query if user is self
if self.request.user.id == user_id:
return self.request.user
return get_user_model().objects.get(pk=pk.split('_')[0])
except (ValueError, get_user_model().DoesNotExist):
raise exceptions.NotFound()
# list case
try:
user_id = self.request.query_params.get('user')
if user_id is None:
raise exceptions.ParseError(_('User filter needs to be set'))
# avoid query if user is self
if self.request.user.id == int(user_id):
return self.request.user
return get_user_model().objects.get(pk=user_id)
except (ValueError, get_user_model().DoesNotExist):
raise exceptions.ParseError(_('User is invalid'))
def determine_version(self, request, *args, **kwargs):
version = kwargs.get(self.version_param, self.default_version)
if not self.is_allowed_version(version):
raise exceptions.NotFound(self.invalid_version_message)
return version