def _get_gcp_libcloud_credentials(module, service_account_email=None, credentials_file=None, project_id=None):
"""
Helper to look for libcloud secrets.py file.
Note: This has an 'additive' effect right now, filling in
vars not specified elsewhere, in order to keep legacy functionality.
This method of specifying credentials will be deprecated, otherwise
we'd look to make it more restrictive with an all-vars-or-nothing approach.
:param service_account: GCP service account email used to make requests
:type service_account: ``str`` or None
:param credentials_file: Path on disk to credentials file
:type credentials_file: ``str`` or None
:param project_id: GCP project ID.
:type project_id: ``str`` or None
:return: tuple of (service_account, credentials_file, project_id)
:rtype: ``tuple`` of ``str``
"""
if service_account_email is None or credentials_file is None:
try:
import secrets
module.deprecate(msg=("secrets file found at '%s'. This method of specifying "
"credentials is deprecated. Please use env vars or "
"Ansible YAML files instead" % (secrets.__file__)), version=2.5)
except ImportError:
secrets = None
if hasattr(secrets, 'GCE_PARAMS'):
if not service_account_email:
service_account_email = secrets.GCE_PARAMS[0]
if not credentials_file:
credentials_file = secrets.GCE_PARAMS[1]
keyword_params = getattr(secrets, 'GCE_KEYWORD_PARAMS', {})
if not project_id:
project_id = keyword_params.get('project', None)
return (service_account_email, credentials_file, project_id)
python类GCE_PARAMS的实例源码
def get_gce_driver(self):
"""Determine the GCE authorization settings and return a
libcloud driver.
"""
# Attempt to get GCE params from a configuration file, if one
# exists.
secrets_path = self.config.get('gce', 'libcloud_secrets')
secrets_found = False
try:
import secrets
args = list(secrets.GCE_PARAMS)
kwargs = secrets.GCE_KEYWORD_PARAMS
secrets_found = True
except:
pass
if not secrets_found and secrets_path:
if not secrets_path.endswith('secrets.py'):
err = "Must specify libcloud secrets file as "
err += "/absolute/path/to/secrets.py"
sys.exit(err)
sys.path.append(os.path.dirname(secrets_path))
try:
import secrets
args = list(getattr(secrets, 'GCE_PARAMS', []))
kwargs = getattr(secrets, 'GCE_KEYWORD_PARAMS', {})
secrets_found = True
except:
pass
if not secrets_found:
args = [
self.config.get('gce', 'gce_service_account_email_address'),
self.config.get('gce', 'gce_service_account_pem_file_path')
]
kwargs = {'project': self.config.get('gce', 'gce_project_id'),
'datacenter': self.config.get('gce', 'gce_zone')}
# If the appropriate environment variables are set, they override
# other configuration; process those into our args and kwargs.
args[0] = os.environ.get('GCE_EMAIL', args[0])
args[1] = os.environ.get('GCE_PEM_FILE_PATH', args[1])
args[1] = os.environ.get('GCE_CREDENTIALS_FILE_PATH', args[1])
kwargs['project'] = os.environ.get('GCE_PROJECT', kwargs['project'])
kwargs['datacenter'] = os.environ.get('GCE_ZONE', kwargs['datacenter'])
# Retrieve and return the GCE driver.
gce = get_driver(Provider.GCE)(*args, **kwargs)
gce.connection.user_agent_append(
'%s/%s' % (USER_AGENT_PRODUCT, USER_AGENT_VERSION),
)
return gce
def get_gce_driver(self):
"""Determine the GCE authorization settings and return a
libcloud driver.
"""
# Attempt to get GCE params from a configuration file, if one
# exists.
secrets_path = self.config.get('gce', 'libcloud_secrets')
secrets_found = False
try:
import secrets
args = list(secrets.GCE_PARAMS)
kwargs = secrets.GCE_KEYWORD_PARAMS
secrets_found = True
except:
pass
if not secrets_found and secrets_path:
if not secrets_path.endswith('secrets.py'):
err = "Must specify libcloud secrets file as "
err += "/absolute/path/to/secrets.py"
sys.exit(err)
sys.path.append(os.path.dirname(secrets_path))
try:
import secrets
args = list(getattr(secrets, 'GCE_PARAMS', []))
kwargs = getattr(secrets, 'GCE_KEYWORD_PARAMS', {})
secrets_found = True
except:
pass
if not secrets_found:
args = [
self.config.get('gce', 'gce_service_account_email_address'),
self.config.get('gce', 'gce_service_account_pem_file_path')
]
kwargs = {'project': self.config.get('gce', 'gce_project_id'),
'datacenter': self.config.get('gce', 'gce_zone')}
# If the appropriate environment variables are set, they override
# other configuration; process those into our args and kwargs.
args[0] = os.environ.get('GCE_EMAIL', args[0])
args[1] = os.environ.get('GCE_PEM_FILE_PATH', args[1])
args[1] = os.environ.get('GCE_CREDENTIALS_FILE_PATH', args[1])
kwargs['project'] = os.environ.get('GCE_PROJECT', kwargs['project'])
kwargs['datacenter'] = os.environ.get('GCE_ZONE', kwargs['datacenter'])
# Retrieve and return the GCE driver.
gce = get_driver(Provider.GCE)(*args, **kwargs)
gce.connection.user_agent_append(
'%s/%s' % (USER_AGENT_PRODUCT, USER_AGENT_VERSION),
)
return gce
def get_gce_driver(self):
"""Determine the GCE authorization settings and return a
libcloud driver.
"""
# Attempt to get GCE params from a configuration file, if one
# exists.
secrets_path = self.config.get('gce', 'libcloud_secrets')
secrets_found = False
try:
import secrets
args = list(secrets.GCE_PARAMS)
kwargs = secrets.GCE_KEYWORD_PARAMS
secrets_found = True
except:
pass
if not secrets_found and secrets_path:
if not secrets_path.endswith('secrets.py'):
err = "Must specify libcloud secrets file as "
err += "/absolute/path/to/secrets.py"
sys.exit(err)
sys.path.append(os.path.dirname(secrets_path))
try:
import secrets
args = list(getattr(secrets, 'GCE_PARAMS', []))
kwargs = getattr(secrets, 'GCE_KEYWORD_PARAMS', {})
secrets_found = True
except:
pass
if not secrets_found:
args = [
self.config.get('gce', 'gce_service_account_email_address'),
self.config.get('gce', 'gce_service_account_pem_file_path')
]
kwargs = {'project': self.config.get('gce', 'gce_project_id'),
'datacenter': self.config.get('gce', 'gce_zone')}
# If the appropriate environment variables are set, they override
# other configuration; process those into our args and kwargs.
args[0] = os.environ.get('GCE_EMAIL', args[0])
args[1] = os.environ.get('GCE_PEM_FILE_PATH', args[1])
args[1] = os.environ.get('GCE_CREDENTIALS_FILE_PATH', args[1])
kwargs['project'] = os.environ.get('GCE_PROJECT', kwargs['project'])
kwargs['datacenter'] = os.environ.get('GCE_ZONE', kwargs['datacenter'])
# Retrieve and return the GCE driver.
gce = get_driver(Provider.GCE)(*args, **kwargs)
gce.connection.user_agent_append(
'%s/%s' % (USER_AGENT_PRODUCT, USER_AGENT_VERSION),
)
return gce