views.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:authserver 作者: jdelic 项目源码 文件源码
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
        # POST received:
        # <QueryDict: {
        #       'client_id': ['docker'],
        #       'refresh_token': ['boink'],
        #       'service': ['registry.maurusnet.test'],
        #       'scope': ['repository:dev/destalinator:push,pull'],
        #       'grant_type': ['refresh_token']}>
        if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token":
            tr = _tkr_parse(request.POST)

            if tr.scope:
                tp = TokenPermissions.parse_scope(tr.scope)
            else:
                return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope)

            try:
                client = DockerRegistry.objects.get(client_id=tr.service)  # type: DockerRegistry
            except DockerRegistry.DoesNotExist:
                return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr))

            user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(),
                                                 expected_issuer=request.get_host(),
                                                 expected_audience=tr.service)
            if user:
                try:
                    drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id)
                except DockerRepo.DoesNotExist:
                    if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS:
                        drepo = DockerRepo()
                        drepo.name = tp.path
                        drepo.registry = client
                        drepo.unauthenticated_read = True
                        drepo.unauthenticated_write = True
                    else:
                        return HttpResponseNotFound("No such repo '%s'" % tp.path)

                if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp):
                    rightnow = datetime.datetime.now(tz=pytz.UTC)
                    return HttpResponse(content=json.dumps({
                        "access_token": self._create_jwt(
                            self._make_access_token(request, tr, rightnow, tp, user),
                            client.private_key_pem(),
                        ),
                        "scope": tr.scope,
                        "expires_in": 119,
                        "refresh_token": self._create_jwt(
                            self._make_refresh_token(request, tr, rightnow, user),
                            client.private_key_pem(),
                        )
                    }), status=200, content_type="application/json")
                else:
                    return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path))
            else:
                return HttpResponse("Unauthorized", status=401)
        else:
            return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号