def _load_credentials(cls, keydata):
"""Load ServiceAccountCredentials from Google service account JSON keyfile
Args:
keydata (dict): The loaded keyfile data from a Google service account
JSON file
Returns:
oauth2client.service_account.ServiceAccountCredentials: Instance of
service account credentials for this discovery service
"""
try:
creds = ServiceAccountCredentials.from_json_keyfile_dict(
keydata, scopes=cls._SCOPES)
except (ValueError, KeyError):
# This has the potential to raise errors. See: https://tinyurl.com/y8q5e9rm
LOGGER.exception('Could not generate credentials from keyfile for %s',
cls.type())
return False
return creds
python类from_json_keyfile_dict()的实例源码
def get_authorized_http_object(scopes):
'''JWT credentials authorization.
:param scopes: the authorization scope to be requested
:return: httplib2.Http object, with authorized credentials
'''
def _build_jwt_dict():
jwt_dict = {key.replace(JWT_NAMESPACE, '').lower(): settings[key]
for key in settings
if key.startswith(JWT_NAMESPACE)}
# Handle newlines in private key
if 'private_key' in jwt_dict:
jwt_dict['private_key'] = \
jwt_dict['private_key'].replace('\\n', '\n')
jwt_dict['PROJECT_ID'] = settings['GOOGLE_API_PROJECT_ID']
return jwt_dict
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
_build_jwt_dict(), scopes=scopes)
return credentials.authorize(httplib2.Http())
def _get_application_default_credential_from_file(filename):
"""Build the Application Default Credentials from file."""
# read the credentials from the file
with open(filename) as file_obj:
client_credentials = json.load(file_obj)
credentials_type = client_credentials.get('type')
if credentials_type == AUTHORIZED_USER:
required_fields = set(['client_id', 'client_secret', 'refresh_token'])
elif credentials_type == SERVICE_ACCOUNT:
required_fields = set(['client_id', 'client_email', 'private_key_id',
'private_key'])
else:
raise ApplicationDefaultCredentialsError(
"'type' field should be defined (and have one of the '" +
AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + "' values)")
missing_fields = required_fields.difference(client_credentials.keys())
if missing_fields:
_raise_exception_for_missing_fields(missing_fields)
if client_credentials['type'] == AUTHORIZED_USER:
return GoogleCredentials(
access_token=None,
client_id=client_credentials['client_id'],
client_secret=client_credentials['client_secret'],
refresh_token=client_credentials['refresh_token'],
token_expiry=None,
token_uri=GOOGLE_TOKEN_URI,
user_agent='Python client library')
else: # client_credentials['type'] == SERVICE_ACCOUNT
from oauth2client.service_account import ServiceAccountCredentials
return ServiceAccountCredentials.from_json_keyfile_dict(
client_credentials)
def __init__(self, config):
self.api_key = config["apiKey"]
self.auth_domain = config["authDomain"]
self.database_url = config["databaseURL"]
self.storage_bucket = config["storageBucket"]
self.credentials = None
self.requests = requests.Session()
if config.get("serviceAccount"):
scopes = [
'https://www.googleapis.com/auth/firebase.database',
'https://www.googleapis.com/auth/userinfo.email',
"https://www.googleapis.com/auth/cloud-platform"
]
service_account_type = type(config["serviceAccount"])
if service_account_type is str:
self.credentials = ServiceAccountCredentials.from_json_keyfile_name(config["serviceAccount"], scopes)
if service_account_type is dict:
self.credentials = ServiceAccountCredentials.from_json_keyfile_dict(config["serviceAccount"], scopes)
if is_appengine_sandbox():
# Fix error in standard GAE environment
# is releated to https://github.com/kennethreitz/requests/issues/3187
# ProtocolError('Connection aborted.', error(13, 'Permission denied'))
adapter = appengine.AppEngineAdapter(max_retries=3)
else:
adapter = requests.adapters.HTTPAdapter(max_retries=3)
for scheme in ('http://', 'https://'):
self.requests.mount(scheme, adapter)
def authorize_service_account(json_credentials, scope, sub=None):
"""
authorize to the provide scope with a service credentials json file.
:param json_credentials: dictonary representing a service account, in most cases created from json.load
:param scope: scope(s) to authorize the application for
:param sub: User ID to authorize the application for (if needed)
:return Credentials to be used for http object
"""
try:
from oauth2client.service_account import ServiceAccountCredentials
except ImportError:
ServiceAccountCredentials = None
if ServiceAccountCredentials is not None and hasattr(ServiceAccountCredentials, "from_json_keyfile_dict"):
# running oauth2client version 2.0
creds = ServiceAccountCredentials.from_json_keyfile_dict(json_credentials, scope)
if sub is not None:
creds = creds.create_delegated(sub)
else:
try:
from oauth2client.client import SignedJwtAssertionCredentials
except ImportError:
raise EnvironmentError("Service account can not be used because PyCrypto is not available. Please install PyCrypto.")
if not isinstance(scope, (list, tuple)):
scope = [scope]
creds = SignedJwtAssertionCredentials(
service_account_name=json_credentials['client_email'],
private_key=json_credentials['private_key'],
scope=scope,
sub=sub)
return creds
def create_credentials():
scopes = ['https://www.googleapis.com/auth/admin.directory.resource.calendar',
'https://www.googleapis.com/auth/calendar']
client_secret_dict = ast.literal_eval(os.environ['MR_CLIENT_SECRET_JSON'])
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
client_secret_dict, scopes=scopes).create_delegated(os.environ['MR_DELEGATED_ACCOUNT'])
http = credentials.authorize(Http())
directory = build('admin', 'directory_v1', http=http)
calendar = build('calendar', 'v3', http=http)
return directory, calendar
def __init__(self, credentials_dict):
"""
Build GCE service account credentials from info stored in environment variables, then build a GCE API wrapper
with those credentials. Only one of the two environment variables must be set.
:param creds_env_var_name: JSON string that contains your GCE service account credentials
:param creds_path_env_var_name: string that contains the path to the file containing your GCE service account
credentials.
"""
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
credentials_dict, scopes='https://www.googleapis.com/auth/cloud-platform')
self.compute = discovery.build('compute', 'v1', credentials=credentials)
self.deployment_manager = discovery.build('deploymentmanager', 'v2', credentials=credentials)
self.project_id = credentials_dict['project_id']
def get_gcloud(ctx, version: str= 'v1'):
"""
Get a configured Google Compute API Client instance.
Note that the Google API Client is not threadsafe. Cache the instance locally
if you want to avoid OAuth overhead between calls.
Parameters
----------
version
Compute API version
"""
SCOPES = 'https://www.googleapis.com/auth/compute'
credentials = None
if ctx.config.get('gcloud_json_keyfile_name'):
credentials = ServiceAccountCredentials.from_json_keyfile_name(
ctx.config.get('gcloud_json_keyfile_name'),
scopes=SCOPES)
if ctx.config.get('gcloud_json_keyfile_string'):
keyfile = json.loads(ctx.config.get('gcloud_json_keyfile_string'))
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
keyfile, scopes=SCOPES)
if not credentials:
credentials = GoogleCredentials.get_application_default()
if not credentials:
raise RuntimeError("Auth for Google Cloud was not configured")
compute = discovery.build(
'compute',
version,
credentials=credentials,
# https://github.com/google/google-api-python-client/issues/299#issuecomment-268915510
cache_discovery=False
)
return compute
gbq.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_service_account_credentials(self):
# Bug fix for https://github.com/pydata/pandas/issues/12572
# We need to know that a supported version of oauth2client is installed
# Test that either of the following is installed:
# - SignedJwtAssertionCredentials from oauth2client.client
# - ServiceAccountCredentials from oauth2client.service_account
# SignedJwtAssertionCredentials is available in oauthclient < 2.0.0
# ServiceAccountCredentials is available in oauthclient >= 2.0.0
oauth2client_v1 = True
oauth2client_v2 = True
try:
from oauth2client.client import SignedJwtAssertionCredentials
except ImportError:
oauth2client_v1 = False
try:
from oauth2client.service_account import ServiceAccountCredentials
except ImportError:
oauth2client_v2 = False
if not oauth2client_v1 and not oauth2client_v2:
raise ImportError("Missing oauth2client required for BigQuery "
"service account support")
from os.path import isfile
try:
if isfile(self.private_key):
with open(self.private_key) as f:
json_key = json.loads(f.read())
else:
# ugly hack: 'private_key' field has new lines inside,
# they break json parser, but we need to preserve them
json_key = json.loads(self.private_key.replace('\n', ' '))
json_key['private_key'] = json_key['private_key'].replace(
' ', '\n')
if compat.PY3:
json_key['private_key'] = bytes(
json_key['private_key'], 'UTF-8')
if oauth2client_v1:
return SignedJwtAssertionCredentials(
json_key['client_email'],
json_key['private_key'],
self.scope,
)
else:
return ServiceAccountCredentials.from_json_keyfile_dict(
json_key,
self.scope)
except (KeyError, ValueError, TypeError, AttributeError):
raise InvalidPrivateKeyFormat(
"Private key is missing or invalid. It should be service "
"account private key JSON (file path or string contents) "
"with at least two keys: 'client_email' and 'private_key'. "
"Can be obtained from: https://console.developers.google."
"com/permissions/serviceaccounts")