def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
# POST received:
# <QueryDict: {
# 'client_id': ['docker'],
# 'refresh_token': ['boink'],
# 'service': ['registry.maurusnet.test'],
# 'scope': ['repository:dev/destalinator:push,pull'],
# 'grant_type': ['refresh_token']}>
if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token":
tr = _tkr_parse(request.POST)
if tr.scope:
tp = TokenPermissions.parse_scope(tr.scope)
else:
return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope)
try:
client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry
except DockerRegistry.DoesNotExist:
return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr))
user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(),
expected_issuer=request.get_host(),
expected_audience=tr.service)
if user:
try:
drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id)
except DockerRepo.DoesNotExist:
if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS:
drepo = DockerRepo()
drepo.name = tp.path
drepo.registry = client
drepo.unauthenticated_read = True
drepo.unauthenticated_write = True
else:
return HttpResponseNotFound("No such repo '%s'" % tp.path)
if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp):
rightnow = datetime.datetime.now(tz=pytz.UTC)
return HttpResponse(content=json.dumps({
"access_token": self._create_jwt(
self._make_access_token(request, tr, rightnow, tp, user),
client.private_key_pem(),
),
"scope": tr.scope,
"expires_in": 119,
"refresh_token": self._create_jwt(
self._make_refresh_token(request, tr, rightnow, user),
client.private_key_pem(),
)
}), status=200, content_type="application/json")
else:
return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path))
else:
return HttpResponse("Unauthorized", status=401)
else:
return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")
评论列表
文章目录