def __init__(self,
service_account_email,
signer,
scopes=None,
private_key_id=None,
client_id=None,
user_agent=None,
token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI,
additional_claims=None):
if additional_claims is None:
additional_claims = {}
super(_JWTAccessCredentials, self).__init__(
service_account_email,
signer,
private_key_id=private_key_id,
client_id=client_id,
user_agent=user_agent,
token_uri=token_uri,
revoke_uri=revoke_uri,
**additional_claims)
python类GOOGLE_TOKEN_URI的实例源码
def create_scoped(self, scopes, token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI):
# Returns an OAuth2 credentials with the given scope
result = ServiceAccountCredentials(self._service_account_email,
self._signer,
scopes=scopes,
private_key_id=self._private_key_id,
client_id=self.client_id,
user_agent=self._user_agent,
token_uri=token_uri,
revoke_uri=revoke_uri,
**self._kwargs)
if self._private_key_pkcs8_pem is not None:
result._private_key_pkcs8_pem = self._private_key_pkcs8_pem
if self._private_key_pkcs12 is not None:
result._private_key_pkcs12 = self._private_key_pkcs12
if self._private_key_password is not None:
result._private_key_password = self._private_key_password
return result
def __init__(self, assertion_type, user_agent=None,
token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI,
**unused_kwargs):
"""Constructor for AssertionFlowCredentials.
Args:
assertion_type: string, assertion type that will be declared to the auth
server
user_agent: string, The HTTP User-Agent to provide for this application.
token_uri: string, URI for token endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
revoke_uri: string, URI for revoke endpoint.
"""
super(AssertionCredentials, self).__init__(
None,
None,
None,
None,
None,
token_uri,
user_agent,
revoke_uri=revoke_uri)
self.assertion_type = assertion_type
def __init__(self, service_account_id, service_account_email, private_key_id,
private_key_pkcs8_text, scopes, user_agent=None,
token_uri=GOOGLE_TOKEN_URI, revoke_uri=GOOGLE_REVOKE_URI,
**kwargs):
super(_ServiceAccountCredentials, self).__init__(
None, user_agent=user_agent, token_uri=token_uri, revoke_uri=revoke_uri)
self._service_account_id = service_account_id
self._service_account_email = service_account_email
self._private_key_id = private_key_id
self._private_key = _get_private_key(private_key_pkcs8_text)
self._private_key_pkcs8_text = private_key_pkcs8_text
self._scopes = util.scopes_to_string(scopes)
self._user_agent = user_agent
self._token_uri = token_uri
self._revoke_uri = revoke_uri
self._kwargs = kwargs
def __init__(self,
service_account_email,
signer,
scopes='',
private_key_id=None,
client_id=None,
user_agent=None,
token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
**kwargs):
super(ServiceAccountCredentials, self).__init__(
None, user_agent=user_agent, token_uri=token_uri,
revoke_uri=revoke_uri)
self._service_account_email = service_account_email
self._signer = signer
self._scopes = _helpers.scopes_to_string(scopes)
self._private_key_id = private_key_id
self.client_id = client_id
self._user_agent = user_agent
self._kwargs = kwargs
def create_scoped(self, scopes, token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI):
# Returns an OAuth2 credentials with the given scope
result = ServiceAccountCredentials(self._service_account_email,
self._signer,
scopes=scopes,
private_key_id=self._private_key_id,
client_id=self.client_id,
user_agent=self._user_agent,
token_uri=token_uri,
revoke_uri=revoke_uri,
**self._kwargs)
if self._private_key_pkcs8_pem is not None:
result._private_key_pkcs8_pem = self._private_key_pkcs8_pem
if self._private_key_pkcs12 is not None:
result._private_key_pkcs12 = self._private_key_pkcs12
if self._private_key_password is not None:
result._private_key_password = self._private_key_password
return result
def __init__(self,
service_account_email,
signer,
scopes='',
private_key_id=None,
client_id=None,
user_agent=None,
token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
**kwargs):
super(ServiceAccountCredentials, self).__init__(
None, user_agent=user_agent, token_uri=token_uri,
revoke_uri=revoke_uri)
self._service_account_email = service_account_email
self._signer = signer
self._scopes = _helpers.scopes_to_string(scopes)
self._private_key_id = private_key_id
self.client_id = client_id
self._user_agent = user_agent
self._kwargs = kwargs
def __init__(self,
service_account_email,
signer,
scopes=None,
private_key_id=None,
client_id=None,
user_agent=None,
token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
additional_claims=None):
if additional_claims is None:
additional_claims = {}
super(_JWTAccessCredentials, self).__init__(
service_account_email,
signer,
private_key_id=private_key_id,
client_id=client_id,
user_agent=user_agent,
token_uri=token_uri,
revoke_uri=revoke_uri,
**additional_claims)
def create_scoped(self, scopes, token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI):
# Returns an OAuth2 credentials with the given scope
result = ServiceAccountCredentials(self._service_account_email,
self._signer,
scopes=scopes,
private_key_id=self._private_key_id,
client_id=self.client_id,
user_agent=self._user_agent,
token_uri=token_uri,
revoke_uri=revoke_uri,
**self._kwargs)
if self._private_key_pkcs8_pem is not None:
result._private_key_pkcs8_pem = self._private_key_pkcs8_pem
if self._private_key_pkcs12 is not None:
result._private_key_pkcs12 = self._private_key_pkcs12
if self._private_key_password is not None:
result._private_key_password = self._private_key_password
return result
def __init__(self, service_account_id, service_account_email,
private_key_id, private_key_pkcs8_text, scopes,
user_agent=None, token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI, **kwargs):
super(_ServiceAccountCredentials, self).__init__(
None, user_agent=user_agent, token_uri=token_uri,
revoke_uri=revoke_uri)
self._service_account_id = service_account_id
self._service_account_email = service_account_email
self._private_key_id = private_key_id
self._private_key = _get_private_key(private_key_pkcs8_text)
self._private_key_pkcs8_text = private_key_pkcs8_text
self._scopes = util.scopes_to_string(scopes)
self._user_agent = user_agent
self._token_uri = token_uri
self._revoke_uri = revoke_uri
self._kwargs = kwargs
def __init__(self, assertion_type, user_agent=None,
token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI,
**unused_kwargs):
"""Constructor for AssertionFlowCredentials.
Args:
assertion_type: string, assertion type that will be declared to the auth
server
user_agent: string, The HTTP User-Agent to provide for this application.
token_uri: string, URI for token endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
revoke_uri: string, URI for revoke endpoint.
"""
super(AssertionCredentials, self).__init__(
None,
None,
None,
None,
None,
token_uri,
user_agent,
revoke_uri=revoke_uri)
self.assertion_type = assertion_type
def __init__(self, service_account_id, service_account_email,
private_key_id, private_key_pkcs8_text, scopes,
user_agent=None, token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI, **kwargs):
super(_ServiceAccountCredentials, self).__init__(
None, user_agent=user_agent, token_uri=token_uri,
revoke_uri=revoke_uri)
self._service_account_id = service_account_id
self._service_account_email = service_account_email
self._private_key_id = private_key_id
self._private_key = _get_private_key(private_key_pkcs8_text)
self._private_key_pkcs8_text = private_key_pkcs8_text
self._scopes = util.scopes_to_string(scopes)
self._user_agent = user_agent
self._token_uri = token_uri
self._revoke_uri = revoke_uri
self._kwargs = kwargs
def _get_application_default_credential_from_file(filename):
"""Build the Application Default Credentials from file."""
# read the credentials from the file
with open(filename) as file_obj:
client_credentials = json.load(file_obj)
credentials_type = client_credentials.get('type')
if credentials_type == AUTHORIZED_USER:
required_fields = set(['client_id', 'client_secret', 'refresh_token'])
elif credentials_type == SERVICE_ACCOUNT:
required_fields = set(['client_id', 'client_email', 'private_key_id',
'private_key'])
else:
raise ApplicationDefaultCredentialsError(
"'type' field should be defined (and have one of the '" +
AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + "' values)")
missing_fields = required_fields.difference(client_credentials.keys())
if missing_fields:
_raise_exception_for_missing_fields(missing_fields)
if client_credentials['type'] == AUTHORIZED_USER:
return GoogleCredentials(
access_token=None,
client_id=client_credentials['client_id'],
client_secret=client_credentials['client_secret'],
refresh_token=client_credentials['refresh_token'],
token_expiry=None,
token_uri=GOOGLE_TOKEN_URI,
user_agent='Python client library')
else: # client_credentials['type'] == SERVICE_ACCOUNT
from oauth2client.service_account import ServiceAccountCredentials
return ServiceAccountCredentials.from_json_keyfile_dict(
client_credentials)
def __init__(self, assertion_type, user_agent=None,
token_uri=GOOGLE_TOKEN_URI,
revoke_uri=GOOGLE_REVOKE_URI,
**unused_kwargs):
"""Constructor for AssertionFlowCredentials.
Args:
assertion_type: string, assertion type that will be declared to the
auth server
user_agent: string, The HTTP User-Agent to provide for this
application.
token_uri: string, URI for token endpoint. For convenience defaults
to Google's endpoints but any OAuth 2.0 provider can be
used.
revoke_uri: string, URI for revoke endpoint.
"""
super(AssertionCredentials, self).__init__(
None,
None,
None,
None,
None,
token_uri,
user_agent,
revoke_uri=revoke_uri)
self.assertion_type = assertion_type
def credentials_from_code(client_id, client_secret, scope, code,
redirect_uri='postmessage', http=None,
user_agent=None, token_uri=GOOGLE_TOKEN_URI,
auth_uri=GOOGLE_AUTH_URI,
revoke_uri=GOOGLE_REVOKE_URI):
"""Exchanges an authorization code for an OAuth2Credentials object.
Args:
client_id: string, client identifier.
client_secret: string, client secret.
scope: string or iterable of strings, scope(s) to request.
code: string, An authroization code, most likely passed down from
the client
redirect_uri: string, this is generally set to 'postmessage' to match the
redirect_uri that the client specified
http: httplib2.Http, optional http instance to use to do the fetch
token_uri: string, URI for token endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
auth_uri: string, URI for authorization endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
revoke_uri: string, URI for revoke endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0 provider can be used.
Returns:
An OAuth2Credentials object.
Raises:
FlowExchangeError if the authorization code cannot be exchanged for an
access token
"""
flow = OAuth2WebServerFlow(client_id, client_secret, scope,
redirect_uri=redirect_uri, user_agent=user_agent,
auth_uri=auth_uri, token_uri=token_uri,
revoke_uri=revoke_uri)
credentials = flow.step2_exchange(code, http=http)
return credentials
def _get_application_default_credential_from_file(filename):
"""Build the Application Default Credentials from file."""
# read the credentials from the file
with open(filename) as file_obj:
client_credentials = json.load(file_obj)
credentials_type = client_credentials.get('type')
if credentials_type == AUTHORIZED_USER:
required_fields = set(['client_id', 'client_secret', 'refresh_token'])
elif credentials_type == SERVICE_ACCOUNT:
required_fields = set(['client_id', 'client_email', 'private_key_id',
'private_key'])
else:
raise ApplicationDefaultCredentialsError(
"'type' field should be defined (and have one of the '" +
AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + "' values)")
missing_fields = required_fields.difference(client_credentials.keys())
if missing_fields:
_raise_exception_for_missing_fields(missing_fields)
if client_credentials['type'] == AUTHORIZED_USER:
return GoogleCredentials(
access_token=None,
client_id=client_credentials['client_id'],
client_secret=client_credentials['client_secret'],
refresh_token=client_credentials['refresh_token'],
token_expiry=None,
token_uri=oauth2client.GOOGLE_TOKEN_URI,
user_agent='Python client library')
else: # client_credentials['type'] == SERVICE_ACCOUNT
from oauth2client import service_account
return service_account._JWTAccessCredentials.from_json_keyfile_dict(
client_credentials)
def __init__(self, assertion_type, user_agent=None,
token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
**unused_kwargs):
"""Constructor for AssertionFlowCredentials.
Args:
assertion_type: string, assertion type that will be declared to the
auth server
user_agent: string, The HTTP User-Agent to provide for this
application.
token_uri: string, URI for token endpoint. For convenience defaults
to Google's endpoints but any OAuth 2.0 provider can be
used.
revoke_uri: string, URI for revoke endpoint.
"""
super(AssertionCredentials, self).__init__(
None,
None,
None,
None,
None,
token_uri,
user_agent,
revoke_uri=revoke_uri)
self.assertion_type = assertion_type
def _from_p12_keyfile_contents(cls, service_account_email,
private_key_pkcs12,
private_key_password=None, scopes='',
token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI):
"""Factory constructor from JSON keyfile.
Args:
service_account_email: string, The email associated with the
service account.
private_key_pkcs12: string, The contents of a PKCS#12 keyfile.
private_key_password: string, (Optional) Password for PKCS#12
private key. Defaults to ``notasecret``.
scopes: List or string, (Optional) Scopes to use when acquiring an
access token.
token_uri: string, URI for token endpoint. For convenience defaults
to Google's endpoints but any OAuth 2.0 provider can be
used.
revoke_uri: string, URI for revoke endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0
provider can be used.
Returns:
ServiceAccountCredentials, a credentials object created from
the keyfile.
Raises:
NotImplementedError if pyOpenSSL is not installed / not the
active crypto library.
"""
if private_key_password is None:
private_key_password = _PASSWORD_DEFAULT
if crypt.Signer is not crypt.OpenSSLSigner:
raise NotImplementedError(_PKCS12_ERROR)
signer = crypt.Signer.from_string(private_key_pkcs12,
private_key_password)
credentials = cls(service_account_email, signer, scopes=scopes,
token_uri=token_uri, revoke_uri=revoke_uri)
credentials._private_key_pkcs12 = private_key_pkcs12
credentials._private_key_password = private_key_password
return credentials
def from_p12_keyfile_buffer(cls, service_account_email, file_buffer,
private_key_password=None, scopes='',
token_uri=oauth2client.GOOGLE_TOKEN_URI,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI):
"""Factory constructor from JSON keyfile.
Args:
service_account_email: string, The email associated with the
service account.
file_buffer: stream, A buffer that implements ``read()``
and contains the PKCS#12 key contents.
private_key_password: string, (Optional) Password for PKCS#12
private key. Defaults to ``notasecret``.
scopes: List or string, (Optional) Scopes to use when acquiring an
access token.
token_uri: string, URI for token endpoint. For convenience defaults
to Google's endpoints but any OAuth 2.0 provider can be
used.
revoke_uri: string, URI for revoke endpoint. For convenience
defaults to Google's endpoints but any OAuth 2.0
provider can be used.
Returns:
ServiceAccountCredentials, a credentials object created from
the keyfile.
Raises:
NotImplementedError if pyOpenSSL is not installed / not the
active crypto library.
"""
private_key_pkcs12 = file_buffer.read()
return cls._from_p12_keyfile_contents(
service_account_email, private_key_pkcs12,
private_key_password=private_key_password, scopes=scopes,
token_uri=token_uri, revoke_uri=revoke_uri)