def task_file(request, contest_id, file_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
if not contest.is_visible_in_list and not request.user.is_staff:
return HttpResponseNotFound()
file = get_object_or_404(tasks_models.TaskFile, pk=file_id)
if not contest.has_task(file.task):
return HttpResponseNotFound()
if not contest.is_started() and not request.user.is_staff:
return HttpResponseForbidden('Contest is not started')
participant = contest.get_participant_for_user(request.user)
if not is_task_open(contest, file.task, participant) and not request.user.is_staff:
return HttpResponseForbidden('Task is closed')
if file.participant is not None and file.participant.id != request.user.id:
return HttpResponseForbidden()
file_path = file.get_path_abspath()
return respond_as_attachment(request, file_path, file.name, file.content_type)
python类HttpResponseNotFound()的实例源码
def contest(request, contest_id):
contest = get_object_or_404(models.Contest, pk=contest_id)
if not contest.is_visible_in_list:
has_access = (request.user.is_authenticated() and
(request.user.is_staff or contest.is_user_participating(request.user)))
if not has_access:
return HttpResponseNotFound()
participants = contest.participants.filter(is_approved=True)
news = contest.news.order_by('-publish_time')
if not request.user.is_staff:
news = news.filter(is_published=True)
return render(request, 'contests/contest.html', {
'current_contest': contest,
'contest': contest,
'news': news,
'participants': participants,
})
def task_opens(request, contest_id, task_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
task = get_object_or_404(tasks_models.Task, pk=task_id)
if not contest.has_task(task):
return HttpResponseNotFound()
participants = sorted(contest.participants.all(), key=operator.attrgetter('name'))
for participant in participants:
participant.is_task_open = is_task_open(contest, task, participant)
is_manual_task_opening_available = is_manual_task_opening_available_in_contest(contest)
return render(request, 'contests/task_opens.html', {
'current_contest': contest,
'contest': contest,
'task': task,
'participants': participants,
'is_manual_task_opening_available': is_manual_task_opening_available,
})
def remove_member(request, team_id, user_id):
team = get_object_or_404(models.Team, pk=team_id)
# Only staff and team's captain can edit team
if not request.user.is_staff and team.captain != request.user:
return HttpResponseNotFound()
user = get_object_or_404(users_models.User, pk=user_id)
with transaction.atomic():
if user == team.captain:
messages.warning(request, 'Captain can\'t leave the team')
return redirect(team)
if user not in team.members.all():
messages.warning(request, 'User %s not in the team' % (user.username,))
return redirect(team)
team.members.remove(user)
team.save()
messages.success(request, 'User %s has left the team %s' % (user.username, team.name))
return redirect(team)
def handler404(request, template_name='404.html'):
t = get_template(template_name)
ctx = Context({})
return HttpResponseNotFound(t.render(ctx))
def set_crash_points_limit(request, pk):
if not request.POST:
return HttpResponseNotFound()
server = get_object_or_404(Server, pk=pk, owner=request.user)
server.back_crash_points_limit = request.POST.get('back_crash_points_limit')
server.save()
messages.add_message(request, messages.SUCCESS, "Crash points limit set to {}.".format(request.POST.get('back_crash_points_limit')), extra_tags='success')
return redirect('account:home')
def set_custom_motd(request, pk):
if not request.POST:
return HttpResponseNotFound()
server = get_object_or_404(Server, pk=pk, owner=request.user)
server.back_custom_motd = request.POST.get('back_custom_motd')
server.save()
messages.add_message(request, messages.SUCCESS, "Welcome message changed.", extra_tags='success')
return redirect('account:home')
def add_category(request, contest_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
if contest.tasks_grouping != models.TasksGroping.ByCategories:
return HttpResponseNotFound()
if request.method == 'POST':
form = forms.CategoryForm(data=request.POST)
if form.is_valid():
category = categories_models.Category(
name=form.cleaned_data['name'],
description=form.cleaned_data['description']
)
with transaction.atomic():
category.save()
contest.categories_list.categories.add(category)
contest.save()
return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id]))
else:
form = forms.CategoryForm()
return render(request, 'contests/create_category.html', {
'current_contest': contest,
'contest': contest,
'form': form,
})
def edit_category(request, contest_id, category_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
if contest.tasks_grouping != models.TasksGroping.ByCategories:
return HttpResponseNotFound()
category = get_object_or_404(categories_models.Category, pk=category_id)
if not contest.categories_list.categories.filter(id=category.id).exists():
return HttpResponseNotFound()
if request.method == 'POST':
form = forms.CategoryForm(data=request.POST)
if form.is_valid():
new_category = categories_models.Category(
name=form.cleaned_data['name'],
description=form.cleaned_data['description']
)
new_category.id = category.id
new_category.save()
return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id]))
else:
form = forms.CategoryForm(initial=category.__dict__)
return render(request, "contests/edit_category.html", {
'current_contest': contest,
'contest': contest,
'category': category,
'form': form,
})
def delete_category(request, contest_id, category_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
if contest.tasks_grouping != models.TasksGroping.ByCategories:
return HttpResponseNotFound()
category = get_object_or_404(categories_models.Category, pk=category_id)
if not contest.categories_list.categories.filter(id=category.id).exists():
return HttpResponseNotFound()
contest.categories_list.categories.remove(category)
contest.categories_list.save()
return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id]))
def change_participant_status(request, contest_id, participant_id):
contest = get_object_or_404(models.Contest, pk=contest_id)
participant = get_object_or_404(models.AbstractParticipant, pk=participant_id, contest_id=contest_id)
parameter = request.POST['parameter']
value = request.POST['value'] == 'true'
if parameter not in ('is_approved', 'is_disqualified', 'is_visible_in_scoreboard'):
return HttpResponseNotFound()
setattr(participant, parameter, value)
participant.save()
return redirect(urlresolvers.reverse('contests:participants', args=[contest.id]))
def add_task_to_category(request, contest_id, category_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
category = get_object_or_404(categories_models.Category, pk=category_id, contestcategories__contest_id=contest_id)
if contest.tasks_grouping != models.TasksGroping.ByCategories:
return HttpResponseNotFound()
return add_task_to_contest_view(request, contest, category)
def add_task(request, contest_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
if contest.tasks_grouping != models.TasksGroping.OneByOne:
return HttpResponseNotFound()
return add_task_to_contest_view(request, contest)
def delete_task(request, contest_id, task_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
task = get_object_or_404(tasks_models.Task, pk=task_id)
if not contest.has_task(task):
return HttpResponseNotFound()
task.delete()
return redirect(urlresolvers.reverse('contests:tasks', args=[contest.id]))
def edit_news(request, contest_id, news_id):
contest = get_object_or_404(models.Contest, pk=contest_id)
news = get_object_or_404(models.News, pk=news_id)
if contest.id != news.contest_id:
return HttpResponseNotFound()
if request.method == 'POST':
form = forms.NewsForm(data=request.POST)
if form.is_valid():
new_news = models.News(
author=news.author,
contest=contest,
created_at=news.created_at,
updated_at=news.updated_at,
**form.cleaned_data
)
new_news.id = news.id
new_news.save()
messages.success(request, 'News saved')
return redirect(new_news)
else:
form = forms.NewsForm(initial=news.__dict__)
return render(request, 'contests/edit_news.html', {
'current_contest': contest,
'contest': contest,
'news': news,
'form': form,
})
def open_task(request, contest_id, task_id, participant_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
task = get_object_or_404(tasks_models.Task, pk=task_id)
if not contest.has_task(task):
return HttpResponseNotFound()
if not is_manual_task_opening_available_in_contest(contest):
messages.error(request, 'Manual task opening is forbidden for this contest')
return redirect(urlresolvers.reverse('contests:task_opens', args=[contest.id, task.id]))
participant = get_object_or_404(models.AbstractParticipant, pk=participant_id)
if participant.contest_id != contest.id:
return HttpResponseNotFound()
qs = tasks_models.ManualOpenedTask.objects.filter(
contest=contest,
task=task,
participant=participant
)
# Toggle opens state: close if it's open, open otherwise
if qs.exists():
qs.delete()
if is_task_open(contest, task, participant):
messages.warning(request, 'Task is opened for this participant not manually, you can\'t close it')
else:
messages.success(request, 'Task is closed for %s' % participant.name)
else:
tasks_models.ManualOpenedTask(
contest=contest,
task=task,
participant=participant
).save()
messages.success(request, 'Task is opened for %s' % participant.name)
return JsonResponse({'done': 'ok'})
def respond_as_attachment(request, file_path, original_filename, content_type=None, encoding=None):
if not os.path.exists(file_path):
return HttpResponseNotFound()
with open(file_path, 'rb') as fp:
response = HttpResponse(fp.read())
if content_type is None:
content_type, encoding = mimetypes.guess_type(original_filename)
if content_type is None:
content_type = 'application/octet-stream'
response['Content-Type'] = content_type
response['Content-Length'] = str(os.stat(file_path).st_size)
if encoding is not None:
response['Content-Encoding'] = encoding
# To inspect details for the below code, see http://greenbytes.de/tech/tc2231/
if 'WebKit' in request.META['HTTP_USER_AGENT']:
# Safari 3.0 and Chrome 2.0 accepts UTF-8 encoded string directly.
filename_header = 'filename=%s' % original_filename
elif 'MSIE' in request.META['HTTP_USER_AGENT']:
# IE does not support internationalized filename at all.
# It can only recognize internationalized URL, so we do the trick via routing rules.
filename_header = ''
else:
# For others like Firefox, we follow RFC2231 (encoding extension in HTTP headers).
filename_header = 'filename*=UTF-8\'\'%s' % urllib.parse.quote(original_filename)
response['Content-Disposition'] = 'attachment; ' + filename_header
return response
def edit(request, team_id):
team = get_object_or_404(models.Team, pk=team_id)
# Only staff and team's captain can edit team
if not request.user.is_staff and team.captain != request.user:
return HttpResponseNotFound()
if settings.DRAPO_ONLY_STAFF_CAN_EDIT_TEAM_NAME and not request.user.is_staff:
return HttpResponseNotFound()
if request.method == 'POST':
form = forms.TeamForm(data=request.POST)
if form.is_valid():
team_name = form.cleaned_data['name']
with transaction.atomic():
if models.Team.objects.filter(name=team_name).exists():
form.add_error('name', 'Team with same name already exists')
else:
team.name = team_name
team.save()
messages.success(request, 'Team %s saved' % team.name)
return redirect(team)
else:
form = forms.TeamForm(initial={'name': team.name})
return render(request, 'teams/edit.html', {
'team': team,
'form': form,
})
def tasks(request, contest_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
if not contest.is_visible_in_list and not request.user.is_staff:
return HttpResponseNotFound()
if not contest.is_started() and not request.user.is_staff:
messages.error(request, '%s is not started yet' % contest.name)
return redirect(contest)
solved_tasks_ids = {}
if request.user.is_authenticated():
participant = contest.get_participant_for_user(request.user)
solved_tasks_ids = contest.get_tasks_solved_by_participant(participant)
else:
participant = None
# Iterate all policies, collect opened tasks
opened_tasks_ids = set(
itertools.chain.from_iterable(
policy.get_open_tasks(participant) for policy in contest.tasks_opening_policies.all()
)
)
if contest.tasks_grouping == models.TasksGroping.OneByOne:
tasks = contest.tasks
return render(request, 'contests/tasks_one_by_one.html', {
'current_contest': contest,
'contest': contest,
'tasks': tasks,
'solved_tasks_ids': solved_tasks_ids,
'opened_tasks_ids': opened_tasks_ids,
})
if contest.tasks_grouping == models.TasksGroping.ByCategories:
categories = contest.categories
return render(request, 'contests/tasks_by_categories.html', {
'current_contest': contest,
'contest': contest,
'categories': categories,
'solved_tasks_ids': solved_tasks_ids,
'opened_tasks_ids': opened_tasks_ids,
})
def join(request, invite_hash=None):
if request.method == 'POST' and 'invite_hash' in request.POST:
invite_hash = request.POST['invite_hash']
team = models.Team.objects.filter(invite_hash=invite_hash).first()
error_message = None
if team is None:
error_message = 'Team not found. Return back and try one more time'
if request.method == 'POST' and team is not None:
if team.members.count() > settings.DRAPO_TEAM_SIZE_LIMIT:
messages.error('You can\'t join the team because it\'s very big (%d members)' % team.members.count())
else:
with transaction.atomic():
if request.user in team.members.all():
messages.warning(request, 'You are already in team ' + team.name)
if 'next' in request.POST and '//' not in request.POST['next']:
return redirect(request.POST['next'])
return redirect(urlresolvers.reverse('teams:team', args=[team.id]))
if settings.DRAPO_USER_CAN_BE_ONLY_IN_ONE_TEAM and \
models.Team.objects.filter(members=request.user).exists() and \
not request.user.is_staff:
messages.error(request,
'You can\'t join team %s while you are a member of another team' % team.name)
if 'next' in request.POST and '//' not in request.POST['next']:
return redirect(request.POST['next'])
return redirect(urlresolvers.reverse('teams:team', args=[team.id]))
team.members.add(request.user)
team.save()
messages.success(request, 'You joined team ' + team.name + '!')
if 'next' in request.POST and '//' not in request.POST['next']:
return redirect(request.POST['next'])
return redirect(urlresolvers.reverse('teams:team', args=[team.id]))
elif request.method == 'GET':
if team is None:
return HttpResponseNotFound()
return render(request, 'teams/join.html', {
'error_message': error_message,
'team': team
})
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
# POST received:
# <QueryDict: {
# 'client_id': ['docker'],
# 'refresh_token': ['boink'],
# 'service': ['registry.maurusnet.test'],
# 'scope': ['repository:dev/destalinator:push,pull'],
# 'grant_type': ['refresh_token']}>
if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token":
tr = _tkr_parse(request.POST)
if tr.scope:
tp = TokenPermissions.parse_scope(tr.scope)
else:
return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope)
try:
client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry
except DockerRegistry.DoesNotExist:
return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr))
user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(),
expected_issuer=request.get_host(),
expected_audience=tr.service)
if user:
try:
drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id)
except DockerRepo.DoesNotExist:
if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS:
drepo = DockerRepo()
drepo.name = tp.path
drepo.registry = client
drepo.unauthenticated_read = True
drepo.unauthenticated_write = True
else:
return HttpResponseNotFound("No such repo '%s'" % tp.path)
if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp):
rightnow = datetime.datetime.now(tz=pytz.UTC)
return HttpResponse(content=json.dumps({
"access_token": self._create_jwt(
self._make_access_token(request, tr, rightnow, tp, user),
client.private_key_pem(),
),
"scope": tr.scope,
"expires_in": 119,
"refresh_token": self._create_jwt(
self._make_refresh_token(request, tr, rightnow, user),
client.private_key_pem(),
)
}), status=200, content_type="application/json")
else:
return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path))
else:
return HttpResponse("Unauthorized", status=401)
else:
return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")