def test_can_rsvp(self, rsvp_notification):
url = reverse('view_event', kwargs={'pk': self.other_event.pk})
self.client.force_login(self.person.role)
response = self.client.get(url)
self.assertIn('Participer à cet événement', response.content.decode())
response = self.client.post(url, data={
'action': 'rsvp'
}, follow=True)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.person, self.other_event.attendees.all())
self.assertIn('Annuler ma participation', response.content.decode())
rsvp_notification.delay.assert_called_once()
rsvp = RSVP.objects.get(person=self.person, event=self.other_event)
self.assertEqual(rsvp_notification.delay.call_args[0][0], rsvp.pk)
python类reverse()的实例源码
def test_can_join(self, someone_joined):
url = reverse('view_group', kwargs={'pk': self.manager_group.pk})
self.client.force_login(self.other_person.role)
response = self.client.get(url)
self.assertNotIn(self.other_person, self.manager_group.members.all())
self.assertIn('Rejoindre ce groupe', response.content.decode())
response = self.client.post(url, data={
'action': 'join'
}, follow=True)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.other_person, self.manager_group.members.all())
self.assertIn('Quitter le groupe', response.content.decode())
someone_joined.delay.assert_called_once()
membership = Membership.objects.get(person=self.other_person, supportgroup=self.manager_group)
self.assertEqual(someone_joined.delay.call_args[0][0], membership.pk)
def test_cannot_view_unpublished_group(self, geocode_person):
self.client.force_login(self.person.role)
self.referent_group.published = False
self.referent_group.save()
res = self.client.get(reverse('dashboard'))
self.assertNotContains(res, self.referent_group.pk)
geocode_person.delay.assert_called_once()
res = self.client.get('/groupes/{}/'.format(self.referent_group.pk))
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
group_pages = ['view_group', 'manage_group', 'edit_group', 'change_group_location', 'quit_group']
for page in group_pages:
res = self.client.get(reverse(page, args=(self.referent_group.pk,)))
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND, '"{}" did not return 404'.format(page))
def post(self, request, cid):
if self.contest.status != 0:
raise PermissionDenied
text = request.POST["text"]
if not text or not request.user.is_authenticated:
raise PermissionDenied
if is_contest_manager(request.user, self.contest):
ContestClarification.objects.create(contest=self.contest, important=True, author=request.user, answer=text)
notify.send(sender=request.user,
recipient=list(map(lambda x: x.user, self.contest.contestparticipant_set.select_related("user").all())),
verb="posted a notification in",
level="warning",
target=self.contest,
description=text)
else:
ContestClarification.objects.create(contest=self.contest, author=request.user, text=text)
notify.send(sender=request.user,
recipient=self.contest.managers.all(),
verb="asked a question in",
level="warning",
target=self.contest,
description=text)
return redirect(reverse("contest:dashboard", kwargs={"cid": self.contest.pk}))
def post(self, request, cid):
try:
if self.contest.status != 0:
raise ValueError("Contest is not running.")
lang = request.POST.get('lang', '')
if lang not in self.contest.supported_language_list:
raise ValueError("Invalid language.")
try:
problem = self.contest.contestproblem_set.get(identifier=request.POST.get('problem', '')).problem_id
except ContestProblem.DoesNotExist:
raise ValueError("Invalid problem.")
submission = create_submission(problem, self.user, request.POST.get('code', ''), lang,
contest=self.contest, ip=get_ip(request))
contest_participant, _ = self.contest.contestparticipant_set.get_or_create(user=self.user)
if contest_participant.is_disabled:
raise ValueError("You have quitted the contest.")
judge_submission_on_contest(submission)
return JsonResponse({"url": reverse('contest:submission_api',
kwargs={'cid': self.contest.id, 'sid': submission.id})})
except Exception as e:
return HttpResponseBadRequest(str(e).encode())
def proxy_file_downloader(request):
if not is_admin_or_root(request.user):
raise PermissionDenied
def download_file(url):
local_filename = url.split('/')[-1]
if local_filename == '':
local_filename = random_string()
r = requests.get(url, stream=True, timeout=30)
with open(path.join(settings.UPLOAD_DIR, local_filename), 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
if request.method == 'POST':
try:
url = request.POST['url']
Thread(target=download_file, args=(url,)).start()
except Exception as e:
raise PermissionDenied(repr(e))
return redirect(reverse('filemanager'))
def authorize_user(func):
def _authorize_user(request, *args, **kwargs):
anno_user_id = request.session.get('anno_user_id', False)
if not anno_user_id:
return HttpResponseRedirect(reverse('annotator:login'))
try:
anno_user = AnnoUser.objects.get(id=anno_user_id)
request.session['user_name'] = anno_user.user_name
try:
ret = func(request, *args, **kwargs)
return ret
finally:
del request.session['user_name']
except AnnoUser.DoesNotExist:
return HttpResponseRedirect(reverse('annotator:login'))
return _authorize_user
def get_absolute_url(self):
return reverse('event-detail', kwargs={'pk': self.pk})
def __call__(self, request):
url = resolve(request.path_info)
if request.user.is_anonymous and url.url_name not in self.UNAUTHENTICATED_URLS:
return redirect(reverse('common:login') + f'?next={request.path}')
return self.get_response(request)
def test_home_has_all_domains(self):
domain1 = Domain(name='????', keywords='???1|???2')
domain1.save()
domain2 = Domain(name='????2', keywords='???1|???3')
domain2.save()
resp = self.client.get(reverse('home'))
resp.text = resp.content.decode('utf-8')
self.assertIn(domain1.name, resp.text)
self.assertIn(domain1.get_absolute_url(), resp.text)
self.assertIn(domain2.name, resp.text)
self.assertIn(domain2.get_absolute_url(), resp.text)
def test_home_has_pages(self):
page1 = Page(title='??1', url='https://test1.org',
birth_date='2017-06-30', has_birth_date=True)
page1.save()
page2 = Page(title='??2', url='https://test2.org',
birth_date='2017-07-01', has_birth_date=True)
page2.save()
resp = self.client.get(reverse('home'))
resp.text = resp.content.decode('utf-8')
self.assertIn(page1.title, resp.text)
self.assertIn(page1.url, resp.text)
self.assertIn(page1.birth_date, resp.text)
self.assertIn(page2.title, resp.text)
self.assertIn(page2.url, resp.text)
self.assertIn(page2.birth_date, resp.text)
def test_domain_page_ok(self):
domain = Domain(name='????', keywords='???1|???2')
domain.save()
resp = self.client.get(reverse('domain', kwargs=dict(domain_id=domain.id)))
self.assertEqual(resp.status_code, 200)
self.assertTemplateUsed(resp, 'index.html')
def test_domain_page_has_title(self):
domain = Domain(name='????', keywords='???1|???2')
domain.save()
resp = self.client.get(reverse('domain', kwargs=dict(domain_id=domain.id)))
resp.text = resp.content.decode('utf-8')
self.assertRegexpMatches(resp.text, r'<title>.*?{}.*?</title>'.format(domain.name))
def test_domain_page_has_all_domains(self):
domain = Domain(name='????', keywords='???1|???2')
domain.save()
resp = self.client.get(reverse('domain', kwargs=dict(domain_id=domain.id)))
resp.text = resp.content.decode('utf-8')
self.assertIn(domain.get_absolute_url(), resp.text)
self.assertRegexpMatches(
resp.text,
re.compile(r'<body>.*?{}.*?</body>'.format(domain.name), re.I|re.DOTALL))
def get_absolute_url(self):
return reverse('domain', kwargs=dict(domain_id=self.pk))
def scan_site(request: HttpRequest, site_id: Union[int, None] = None) -> HttpResponse:
"""Schedule the scan of a site."""
if site_id:
site = get_object_or_404(
Site.objects.annotate_most_recent_scan_start() \
.annotate_most_recent_scan_end_or_null(),
pk=site_id)
else:
# no site_id supplied
form = SingleSiteForm(request.POST)
if form.is_valid():
site, created = Site.objects.annotate_most_recent_scan_start() \
.annotate_most_recent_scan_end_or_null().get_or_create(
url=form.cleaned_data.get('url'))
if created:
site.last_scan__end_or_null = None
site.last_scan__start = None
else:
return render(request, 'frontend/create_site.html', {
'form': form,
})
status_code = site.scan()
if status_code == Site.SCAN_OK:
if not site_id: # if the site is new we want to show the dog
return redirect(reverse('frontend:scan_site_created', args=(site.pk,)))
else:
num_scanning_sites = Scan.objects.filter(end__isnull=True).count()
messages.success(request,
_("A scan of the site has been scheduled. "+ \
"The total number of sites in the scanning queue "+ \
"is %i (including yours)." % num_scanning_sites))
return redirect(reverse('frontend:view_site', args=(site.pk,)))
elif status_code == Site.SCAN_COOLDOWN:
messages.warning(request,
_('The site is already scheduled for scanning or it has been scanned recently. No scan was scheduled.'))
elif status_code == Site.SCAN_BLACKLISTED:
messages.warning(request,
_('The operator of this website requested to be blacklisted, scanning this website is not possible, sorry.'))
return redirect(reverse('frontend:view_site', args=(site.pk,)))
def success_url(self):
return self.request.POST.get('return_url', reverse('management-post-list'))