def get_token(http, service_account='default'):
"""Fetch an oauth token for the
Args:
http: an object to be used to make HTTP requests.
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = client._UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
python类_UTCNOW的实例源码
def _create_token(self, additional_claims=None):
now = client._UTCNOW()
lifetime = datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def get_token(http, service_account='default'):
"""Fetch an oauth token for the
Args:
http: an object to be used to make HTTP requests.
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = client._UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
def _create_token(self, additional_claims=None):
now = client._UTCNOW()
lifetime = datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def get_token(http, service_account='default'):
"""Fetch an oauth token for the
Args:
http: an object to be used to make HTTP requests.
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = client._UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
def get_token(http_request, service_account='default'):
"""Fetch an oauth token for the
Args:
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
http_request: A callable that matches the method
signature of httplib2.Http.request. Used to make the request to the
metadataserver.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http_request,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = _UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
def get_token(http_request, service_account='default'):
"""Fetch an oauth token for the
Args:
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
http_request: A callable that matches the method
signature of httplib2.Http.request. Used to make the request to the
metadataserver.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http_request,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = _UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
def get_token(http, service_account='default'):
"""Fetch an oauth token for the
Args:
http: an object to be used to make HTTP requests.
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = client._UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
def _create_token(self, additional_claims=None):
now = client._UTCNOW()
lifetime = datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def get_token(http, service_account='default'):
"""Fetch an oauth token for the
Args:
http: an object to be used to make HTTP requests.
service_account: An email specifying the service account this token
should represent. Default will be a token for the "default" service
account of the current compute engine instance.
Returns:
A tuple of (access token, token expiration), where access token is the
access token as a string and token expiration is a datetime object
that indicates when the access token will expire.
"""
token_json = get(
http,
'instance/service-accounts/{0}/token'.format(service_account))
token_expiry = client._UTCNOW() + datetime.timedelta(
seconds=token_json['expires_in'])
return token_json['access_token'], token_expiry
def _create_token(self, additional_claims=None):
now = client._UTCNOW()
lifetime = datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def _refresh(self, http):
"""Refreshes the access token.
Args:
http: unused HTTP object
"""
self.devshell_response = _SendRecv()
self.access_token = self.devshell_response.access_token
expires_in = self.devshell_response.expires_in
if expires_in is not None:
delta = datetime.timedelta(seconds=expires_in)
self.token_expiry = client._UTCNOW() + delta
else:
self.token_expiry = None
def _refresh(self, http):
"""Refreshes the access token.
Args:
http: unused HTTP object
"""
self.devshell_response = _SendRecv()
self.access_token = self.devshell_response.access_token
expires_in = self.devshell_response.expires_in
if expires_in is not None:
delta = datetime.timedelta(seconds=expires_in)
self.token_expiry = client._UTCNOW() + delta
else:
self.token_expiry = None
def _refresh(self, http):
"""Refreshes the access token.
Args:
http: unused HTTP object
"""
self.devshell_response = _SendRecv()
self.access_token = self.devshell_response.access_token
expires_in = self.devshell_response.expires_in
if expires_in is not None:
delta = datetime.timedelta(seconds=expires_in)
self.token_expiry = client._UTCNOW() + delta
else:
self.token_expiry = None
def _create_token(self, additional_claims=None):
now = _UTCNOW()
expiry = now + datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def _create_token(self, additional_claims=None):
now = _UTCNOW()
expiry = now + datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def _create_token(self, additional_claims=None):
now = _UTCNOW()
expiry = now + datetime.timedelta(seconds=self._MAX_TOKEN_LIFETIME_SECS)
payload = {
'iat': _datetime_to_secs(now),
'exp': _datetime_to_secs(expiry),
'iss': self._service_account_email,
'sub': self._service_account_email
}
payload.update(self._kwargs)
if additional_claims is not None:
payload.update(additional_claims)
jwt = crypt.make_signed_jwt(self._signer, payload,
key_id=self._private_key_id)
return jwt.decode('ascii'), expiry
def _refresh(self, http):
"""Refreshes the access token.
Args:
http: unused HTTP object
"""
self.devshell_response = _SendRecv()
self.access_token = self.devshell_response.access_token
expires_in = self.devshell_response.expires_in
if expires_in is not None:
delta = datetime.timedelta(seconds=expires_in)
self.token_expiry = client._UTCNOW() + delta
else:
self.token_expiry = None
def _refresh(self, http):
"""Refreshes the access token.
Args:
http: unused HTTP object
"""
self.devshell_response = _SendRecv()
self.access_token = self.devshell_response.access_token
expires_in = self.devshell_response.expires_in
if expires_in is not None:
delta = datetime.timedelta(seconds=expires_in)
self.token_expiry = client._UTCNOW() + delta
else:
self.token_expiry = None