def viewer_login(request: HttpRequest) -> HttpResponse:
if request.method == 'POST':
username = request.POST['username']
password = request.POST['password']
user = authenticate(username=username, password=password)
if user is not None:
if user.is_active:
login(request, user)
next_url = request.POST.get('next', 'viewer:main-page')
return redirect(next_url)
else:
return render_error(request, "This account has been disabled.")
else:
return render_error(request, "Invalid login credentials.")
else:
next_url = request.GET.get('next', 'viewer:main-page')
d = {'next': next_url}
return render(request, 'viewer/login.html', d)
python类HttpRequest()的实例源码
def image_viewer(request: HttpRequest, archive: int, page: int) -> HttpResponse:
images = Image.objects.filter(archive=archive, extracted=True)
if not images:
raise Http404("Archive " + str(archive) + " has no extracted images")
paginator = Paginator(images, 1)
try:
image = paginator.page(page)
except (InvalidPage, EmptyPage):
image = paginator.page(paginator.num_pages)
image_object = image.object_list[0]
if image_object.image_width / image_object.image_height > 1:
image_object.is_horizontal = True
d = {'image': image, 'backurl': redirect(image.object_list[0].archive).url,
'images_range': range(1, images.count() + 1), 'image_object': image_object}
return render(request, "viewer/image_viewer.html", d)
def gallery_thumb(request: HttpRequest, pk: int) -> HttpResponse:
try:
gallery = Gallery.objects.get(pk=pk)
except Gallery.DoesNotExist:
raise Http404("Gallery does not exist")
if not gallery.public and not request.user.is_authenticated:
raise Http404("Gallery is not public")
if 'HTTP_X_FORWARDED_HOST' in request.META:
response = HttpResponse()
response["Content-Type"] = "image/jpeg"
# response["Content-Disposition"] = 'attachment; filename*=UTF-8\'\'{0}'.format(
# archive.pretty_name)
response['X-Accel-Redirect'] = "/image/{0}".format(gallery.thumbnail.name)
return response
else:
return HttpResponseRedirect(gallery.thumbnail.url)
def public_stats(request: HttpRequest) -> HttpResponse:
"""Display public galleries and archives stats."""
if not crawler_settings.urls.enable_public_stats:
if not request.user.is_staff:
raise Http404("Page not found")
else:
return render_error(request, "Page disabled by settings (urls: enable_public_stats).")
stats_dict = {
"n_archives": Archive.objects.filter(public=True).count(),
"archive": Archive.objects.filter(public=True).filter(filesize__gt=0).aggregate(
Avg('filesize'), Max('filesize'), Min('filesize'), Sum('filesize')),
"n_tags": Tag.objects.filter(gallery_tags__public=True).distinct().count(),
"top_10_tags": Tag.objects.filter(gallery_tags__public=True).distinct().annotate(
num_archive=Count('gallery_tags')).order_by('-num_archive')[:10]
}
d = {'stats': stats_dict}
return render(request, "viewer/public_stats.html", d)
def no_route_found(self, request):
"""
Default callback for route not found
:param request HttpRequest
:rtype: Response
"""
response_obj = OrderedDict()
response_obj["status"] = False
response_obj["exceptions"] = {
"message": "No route found for {0} {1}".format(self.__method, self.__uri),
}
response_obj["request"] = {
"method": self.__method,
"path_info": self.__uri,
"content": request.body.decode("utf-8")
}
response_obj["message"] = "We are sorry, but something went terribly wrong."
return Response(response_obj, content_type="application/json", status=404, charset="utf-8")
def get_dummy_request(dc, method=None, user=None, system_user=False):
"""
Return dummy request object.
"""
request = HttpRequest()
request.csrf_processing_done = True
request.dc = dc
if method:
request = set_request_method(request, method, copy_request=False)
if system_user:
from api.task.utils import get_system_task_user
request.user = get_system_task_user()
elif user:
request.user = user
return request
def get_form(self, req: HttpRequest, obj: Domain=None, **kwargs: Any) -> type:
if req.GET.get("_prefill_key", "0") == "1":
def formfield_callback(field: _ModelField, request: HttpRequest=None, **kwargs: Any) -> Type[_FormField]:
f = self.formfield_for_dbfield(field, request=request, **kwargs) # type: _FormField
# f can be None if the dbfield does not get a FormField (like hidden fields
# or auto increment IDs). Only the dbfield has a .name attribute.
if f and field.name == "dkimkey":
if obj:
obj.dkimkey = RSA.generate(2048).exportKey("PEM").decode("utf-8")
else:
f.initial = RSA.generate(2048).exportKey("PEM").decode("utf-8")
return f
kwargs["formfield_callback"] = functools.partial(formfield_callback, request=req)
form_t = super().get_form(req, obj, **kwargs)
return form_t
def _make_access_token(self, request: HttpRequest, tokenreq: _TokenRequest,
rightnow: datetime.datetime, perms: TokenPermissions, for_user: MNUser) -> Dict[str, Any]:
# TODO: figure out if we need to support more than one access scope
# This implementation is based around this article, that, among other things,
# describes the "kid" field required by Docker. The JWK implementation provided
# by jwcrypto doesn't seem to work.
# https://umbrella.cisco.com/blog/blog/2016/02/23/implementing-oauth-for-registry-v2/
_x = [] # type: List[str]
jwtobj = {
'exp': int((rightnow + datetime.timedelta(minutes=2)).timestamp()),
'nbf': int((rightnow - datetime.timedelta(seconds=1)).timestamp()),
'iat': int(rightnow.timestamp()),
'iss': request.get_host(),
'aud': tokenreq.service,
'sub': str(for_user.pk),
'access': [{
"type": perms.type,
"name": perms.path,
"actions": _x + (["push"] if perms.push else []) +
(["pull"] if perms.pull else []) +
(["login"] if perms.type == "login" else [])
}]
} # type: Dict[str, Union[str, int, List[Dict[str, Union[str, List[str]]]]]]
return jwtobj
def setUp(self):
super().setUp()
self.request = HttpRequest()
def viewer_logout(request: HttpRequest) -> HttpResponse:
logout(request)
return HttpResponseRedirect(reverse('viewer:main-page'))
def session_settings(request: HttpRequest) -> HttpResponse:
if request.method == 'POST':
data = json.loads(request.body.decode("utf-8"))
if 'viewer_parameters' in data:
if "viewer_parameters" not in request.session:
request.session["viewer_parameters"] = {}
for k, v in data.items():
if not k == 'viewer_parameters':
request.session["viewer_parameters"][k] = v
request.session.modified = True
return HttpResponse(json.dumps({'result': "ok"}), content_type="application/json; charset=utf-8")
elif request.method == 'GET':
data = json.loads(request.body.decode("utf-8"))
if 'viewer_parameters' in data:
if "viewer_parameters" not in request.session:
request.session["viewer_parameters"] = {}
request.session["viewer_parameters"]["image_width"] = 900
request.session.modified = True
return HttpResponse(
json.dumps(request.session["viewer_parameters"]), content_type="application/json; charset=utf-8"
)
return HttpResponse(json.dumps({'result': "error"}), content_type="application/json; charset=utf-8")
else:
return HttpResponse(json.dumps({'result': "error"}), content_type="application/json; charset=utf-8")
# TODO: Generalize this script for several providers.
def panda_userscript(request: HttpRequest) -> HttpResponse:
return render(
request,
'viewer/panda.user.js',
{
"api_key": crawler_settings.api_key,
"img_url": request.build_absolute_uri(crawler_settings.urls.static_url + 'favicon-160.png'),
"server_url": request.build_absolute_uri(reverse('viewer:json-parser'))
},
content_type='application/javascript'
)
def user_archive_preferences(request: HttpRequest, archive_pk: int, setting: str) -> HttpResponse:
"""Archive user favorite toggle."""
try:
Archive.objects.get(pk=archive_pk)
except Archive.DoesNotExist:
raise Http404("Archive does not exist")
if setting == 'favorite':
current_user_archive_preferences, created = UserArchivePrefs.objects.get_or_create(
user=User.objects.get(pk=request.user.id),
archive=Archive.objects.get(pk=archive_pk),
defaults={'favorite_group': 1}
)
if not created:
current_user_archive_preferences.favorite_group = 1
current_user_archive_preferences.save()
elif setting == 'unfavorite':
current_user_archive_preferences, created = UserArchivePrefs.objects.get_or_create(
user=User.objects.get(pk=request.user.id),
archive=Archive.objects.get(pk=archive_pk),
defaults={'favorite_group': 0}
)
if not created:
current_user_archive_preferences.favorite_group = 0
current_user_archive_preferences.save()
else:
return render_error(request, "Unknown user preference.")
return HttpResponseRedirect(request.META["HTTP_REFERER"],
{'user_archive_preferences': current_user_archive_preferences})
def render_error(request: HttpRequest, message: str) -> HttpResponseRedirect:
messages.error(request, message, extra_tags='danger')
if 'HTTP_REFERER' in request.META:
return HttpResponseRedirect(request.META["HTTP_REFERER"])
else:
return HttpResponseRedirect(reverse('viewer:main-page'))
def about(request: HttpRequest) -> HttpResponse:
return render(
request,
'viewer/about.html'
)
def default_route_options(request: HttpRequest):
"""
Default callback for OPTIONS request
:param request HttpRequest
:rtype: Response
"""
response_obj = OrderedDict()
response_obj["status"] = True
response_obj["data"] = "Ok"
return Response(response_obj, content_type="application/json", charset="utf-8")
def test_http_basic_get_user():
# Basic django request, without authentication info
request = HttpRequest()
# Standard middlewares were not applied on this request
user = http_basic_auth_get_user(request)
assert user is not None
assert user_is_anonymous(user)
def test_model(self):
req = HttpRequest()
self.article.publish()
self.article_2.publish()
# test the listing contains the published article
self.assertTrue(self.article.get_published() in self.listing.get_items_to_list(req))
# ...not the draft one
self.assertTrue(self.article not in self.listing.get_items_to_list(req))
# ...not an article that isn't associated with the listing
self.assertTrue(self.article_2 not in self.listing.get_items_to_list(req))
self.assertTrue(self.article_2.get_published() not in self.listing.get_items_to_list(req))
self.article.unpublish()
self.article_2.unpublish()
def init_request(self):
request = HttpRequest()
request.data = QueryDict().copy()
request.session = DummySession()
return request
def detail(request: HttpRequest, question_id: int) -> HttpResponse:
question = get_object_or_404(Question, pk=question_id)
if not question.is_active:
return 'invalid request'
return render(request, 'polls/detail.html', {'question': question})
def health(request: HttpRequest) -> HttpResponse:
c = connection.cursor() # type: CursorWrapper
try:
c.execute("SELECT 1")
res = c.fetchone()
except DatabaseError as e:
return HttpResponseServerError(("Health check failed: %s" % str(e)).encode("utf-8"),
content_type="text/plain; charset=utf-8")
else:
return HttpResponse(b'All green', status=200,
content_type="text/plain; charset=utf-8")
def nothing(request: HttpRequest) -> HttpResponse:
return HttpResponse(b'nothing to see here', status=200,
content_type="text/plain; charset=utf-8")
def test_error(request: HttpRequest) -> HttpResponse:
if settings.DEBUG:
raise Exception("This is a test")
else:
return HttpResponseRedirect("/")
def branding(request: HttpRequest) -> Dict[str, str]:
return {
"company_name": settings.COMPANY_NAME,
"company_logo": settings.COMPANY_LOGO_URL,
}
def validate_refresh_token(self, refresh_token: str, client: MNApplication,
request: HttpRequest, *args: Any, **kwargs: Any) -> bool:
res = super().validate_refresh_token(refresh_token, client, request, *args, **kwargs)
if res:
# our base validated the refresh token, let's check if the client or user permissions
# changed
missing_permissions = find_missing_permissions(client, request.user)
return len(missing_permissions) == 0
else:
return False
def _make_refresh_token(self, request: HttpRequest, tokenreq: _TokenRequest,
rightnow: datetime.datetime, for_user: MNUser) -> Dict[str, Any]:
jwtobj = {
'exp': int((rightnow + datetime.timedelta(hours=2)).timestamp()),
'nbf': int((rightnow - datetime.timedelta(seconds=1)).timestamp()),
'iat': int(rightnow.timestamp()),
'iss': request.get_host(),
'aud': tokenreq.service,
'sub': str(for_user.pk),
'malleable': True,
'client_id': tokenreq.client_id,
}
return jwtobj
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
return super().dispatch(request, *args, **kwargs)
def render(self):
if not self.template_name:
return None
try:
logger.debug("Template name: %s" % self.template_name)
template = get_template(self.template_name)
except TemplateDoesNotExist:
logger.debug("Template not found: %s" % self.template_name)
return None
# TODO: Avoid using a null HttRequest to context processors
ctx = RequestContext(HttpRequest(), self.ctx)
return template.render(ctx)
def gallery_details(request: HttpRequest, pk: int, tool: str=None) -> HttpResponse:
try:
gallery = Gallery.objects.get(pk=pk)
except Gallery.DoesNotExist:
raise Http404("Gallery does not exist")
if not (gallery.public or request.user.is_authenticated):
raise Http404("Gallery does not exist")
if request.user.is_staff and tool == "download":
if 'downloader' in request.GET:
current_settings = Settings(load_from_config=crawler_settings.config)
current_settings.allow_downloaders_only([request.GET['downloader']], True, True, True)
current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings)
else:
# Since this is used from the gallery page mainly to download an already added gallery using
# downloader settings, force replace_metadata
current_settings = Settings(load_from_config=crawler_settings.config)
current_settings.replace_metadata = True
current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings)
return HttpResponseRedirect(request.META["HTTP_REFERER"])
if request.user.is_staff and tool == "toggle-hidden":
gallery.hidden = not gallery.hidden
gallery.save()
return HttpResponseRedirect(request.META["HTTP_REFERER"])
if request.user.is_staff and tool == "toggle-public":
gallery.public_toggle()
return HttpResponseRedirect(request.META["HTTP_REFERER"])
if request.user.is_staff and tool == "mark-deleted":
gallery.mark_as_deleted()
return HttpResponseRedirect(request.META["HTTP_REFERER"])
if request.user.is_staff and tool == "recall-api":
current_settings = Settings(load_from_config=crawler_settings.config)
current_settings.set_update_metadata_options(providers=(gallery.provider,))
current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings)
frontend_logger.info(
'Updating gallery API data for gallery: {} and related archives'.format(
gallery.get_absolute_url()
)
)
return HttpResponseRedirect(request.META["HTTP_REFERER"])
tag_lists = sort_tags(gallery.tags.all())
d = {'gallery': gallery, 'tag_lists': tag_lists, 'settings': crawler_settings}
return render(request, "viewer/gallery.html", d)
def route(self, request: HttpRequest):
"""
Prepares for the CallBackResolver and handles the response and exceptions
:param request HttpRequest
:rtype: HttpResponse
"""
self.flush()
self.__request = request
self.__uri = request.path[1:]
self.__method = request.method
self.__bound_routes = dict()
self.register('log', logging.getLogger(os.urandom(3).hex().upper()))
self.register('router', RouteMapping())
self.__app['router'].flush_routes()
routes = self.__callable().connect(self.__app)
self.__bound_routes = routes['router'].get__routes()
request_headers = request.META
if 'HTTP_USER_AGENT' in request_headers:
indent = 2 if re.match("[Mozilla]{7}", request_headers['HTTP_USER_AGENT']) else 0
else:
indent = 0
if self.set_end_point_uri() is False:
return self.set_response_headers(self.no_route_found(self.__request).render(indent))
acutal_params = self.get_url_params(self.get_end_point_uri())
try:
response = self.exec_route_callback(acutal_params)
if type(response) == Response:
return self.set_response_headers(response.render(indent))
else:
return self.set_response_headers(response)
except InvalidInputException:
self.__app['log'].error("< 400", exc_info=True)
return self.set_response_headers(Response(None, status=400).render(indent))
except AuthException as e:
self.__app['log'].error("< 403", exc_info=True)
return self.set_response_headers(Response(None, status=403).render(indent))
except NotFoundException:
self.__app['log'].error("< 404", exc_info=True)
return self.set_response_headers(Response(None, status=404).render(indent))
except RequestDataTooBig:
self.__app['log'].error("< 413", exc_info=True)
return self.set_response_headers(Response(None, status=413).render(indent))
except BaseException as e:
self.__app['log'].error("< 500", exc_info=True)
return self.set_response_headers(Response(None, status=500).render(indent))
finally:
del self