def test_get_adc_from_env_var_malformed_file(self, *stubs):
# Set up stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
get_env_file.return_value = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_3.json'))
expected_err = client.ApplicationDefaultCredentialsError
with self.assertRaises(expected_err) as exc_manager:
client.GoogleCredentials.get_application_default()
self.assertTrue(str(exc_manager.exception).startswith(
'An error was encountered while reading json file: ' +
get_env_file.return_value + ' (pointed to by ' +
client.GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):'))
get_env_file.assert_called_once_with()
get_well_known.assert_not_called()
in_gae.assert_not_called()
in_gce.assert_not_called()
python类GoogleCredentials()的实例源码
def test_get_adc_env_from_well_known(self, *stubs):
# Unpack stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
# Make sure the well-known file is an actual file.
get_well_known.return_value = __file__
# Make sure the well-known file actually doesn't exist.
self.assertTrue(os.path.exists(get_well_known.return_value))
method_name = (
'oauth2client.client.'
'_get_application_default_credential_from_file')
result_creds = object()
with mock.patch(method_name,
return_value=result_creds) as get_from_file:
result = client.GoogleCredentials.get_application_default()
self.assertEqual(result, result_creds)
get_from_file.assert_called_once_with(__file__)
get_env_file.assert_called_once_with()
get_well_known.assert_called_once_with()
in_gae.assert_not_called()
in_gce.assert_not_called()
def test_get_adc_from_env_var_malformed_file(self, *stubs):
# Set up stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
get_env_file.return_value = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_3.json'))
expected_err = client.ApplicationDefaultCredentialsError
with self.assertRaises(expected_err) as exc_manager:
client.GoogleCredentials.get_application_default()
self.assertTrue(str(exc_manager.exception).startswith(
'An error was encountered while reading json file: ' +
get_env_file.return_value + ' (pointed to by ' +
client.GOOGLE_APPLICATION_CREDENTIALS + ' environment variable):'))
get_env_file.assert_called_once_with()
get_well_known.assert_not_called()
in_gae.assert_not_called()
in_gce.assert_not_called()
def test_get_adc_env_from_well_known(self, *stubs):
# Unpack stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
# Make sure the well-known file is an actual file.
get_well_known.return_value = __file__
# Make sure the well-known file actually doesn't exist.
self.assertTrue(os.path.exists(get_well_known.return_value))
method_name = (
'oauth2client.client.'
'_get_application_default_credential_from_file')
result_creds = object()
with mock.patch(method_name,
return_value=result_creds) as get_from_file:
result = client.GoogleCredentials.get_application_default()
self.assertEqual(result, result_creds)
get_from_file.assert_called_once_with(__file__)
get_env_file.assert_called_once_with()
get_well_known.assert_called_once_with()
in_gae.assert_not_called()
in_gce.assert_not_called()
def _set_ua_and_scopes(credentials):
"""Set custom Forseti user agent and add cloud scopes on credential object.
Args:
credentials (client.OAuth2Credentials): The credentials object used to
authenticate all http requests.
Returns:
client.OAuth2Credentials: The credentials object with the user agent
attribute set or updated.
"""
if isinstance(credentials, client.OAuth2Credentials):
user_agent = credentials.user_agent
if (not user_agent or
forseti_security.__package_name__ not in user_agent):
credentials.user_agent = (
'Python-httplib2/{} (gzip), {}/{}'.format(
httplib2.__version__,
forseti_security.__package_name__,
forseti_security.__version__))
if (isinstance(credentials, client.GoogleCredentials) and
credentials.create_scoped_required()):
credentials = credentials.create_scoped(list(CLOUD_SCOPES))
return credentials
def run_user_json():
with open(USER_KEY_PATH, 'r') as file_object:
client_credentials = json.load(file_object)
credentials = client.GoogleCredentials(
access_token=None,
client_id=client_credentials['client_id'],
client_secret=client_credentials['client_secret'],
refresh_token=client_credentials['refresh_token'],
token_expiry=None,
token_uri=oauth2client.GOOGLE_TOKEN_URI,
user_agent='Python client library',
)
_check_user_info(credentials, USER_KEY_EMAIL)
def validate_google_credentials(self, credentials):
self.assertIsInstance(credentials, client.GoogleCredentials)
self.assertEqual(None, credentials.access_token)
self.assertEqual('123', credentials.client_id)
self.assertEqual('secret', credentials.client_secret)
self.assertEqual('alabalaportocala', credentials.refresh_token)
self.assertEqual(None, credentials.token_expiry)
self.assertEqual(oauth2client.GOOGLE_TOKEN_URI, credentials.token_uri)
self.assertEqual('Python client library', credentials.user_agent)
def get_a_google_credentials_object(self):
return client.GoogleCredentials(None, None, None, None,
None, None, None, None)
def test_get_application_default_in_gae(self, gae_adc, in_gae,
from_gce, from_files):
credentials = client.GoogleCredentials.get_application_default()
self.assertEqual(credentials, gae_adc.return_value)
from_files.assert_called_once_with()
in_gae.assert_called_once_with()
from_gce.assert_not_called()
def test_get_application_default_in_gce(self, gce_adc, in_gce,
from_files, from_gae):
credentials = client.GoogleCredentials.get_application_default()
self.assertEqual(credentials, gce_adc.return_value)
in_gce.assert_called_once_with()
from_gae.assert_called_once_with()
from_files.assert_called_once_with()
def test_get_adc_from_env_var_authorized_user(self, *stubs):
# Set up stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
get_env_file.return_value = datafile(os.path.join(
'gcloud',
'application_default_credentials_authorized_user.json'))
credentials = client.GoogleCredentials.get_application_default()
self.validate_google_credentials(credentials)
get_env_file.assert_called_once_with()
get_well_known.assert_not_called()
in_gae.assert_not_called()
in_gce.assert_not_called()
def test_get_adc_env_not_set_up(self, *stubs):
# Unpack stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
# Make sure the well-known file actually doesn't exist.
self.assertFalse(os.path.exists(get_well_known.return_value))
expected_err = client.ApplicationDefaultCredentialsError
with self.assertRaises(expected_err) as exc_manager:
client.GoogleCredentials.get_application_default()
self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception))
get_env_file.assert_called_once_with()
get_well_known.assert_called_once_with()
in_gae.assert_called_once_with()
in_gce.assert_called_once_with()
def test_to_from_json_authorized_user(self):
filename = 'application_default_credentials_authorized_user.json'
credentials_file = datafile(os.path.join('gcloud', filename))
creds = client.GoogleCredentials.from_stream(credentials_file)
json = creds.to_json()
creds2 = client.GoogleCredentials.from_json(json)
self.assertEqual(creds.__dict__, creds2.__dict__)
def test_to_from_json_service_account_scoped(self):
credentials_file = datafile(
os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
creds1 = client.GoogleCredentials.from_stream(credentials_file)
creds1 = creds1.create_scoped(['dummy_scope'])
# Convert to and then back from json.
creds2 = client.GoogleCredentials.from_json(creds1.to_json())
creds1_vals = creds1.__dict__
creds1_vals.pop('_signer')
creds2_vals = creds2.__dict__
creds2_vals.pop('_signer')
self.assertEqual(creds1_vals, creds2_vals)
def validate_google_credentials(self, credentials):
self.assertIsInstance(credentials, client.GoogleCredentials)
self.assertEqual(None, credentials.access_token)
self.assertEqual('123', credentials.client_id)
self.assertEqual('secret', credentials.client_secret)
self.assertEqual('alabalaportocala', credentials.refresh_token)
self.assertEqual(None, credentials.token_expiry)
self.assertEqual(oauth2client.GOOGLE_TOKEN_URI, credentials.token_uri)
self.assertEqual('Python client library', credentials.user_agent)
def get_a_google_credentials_object(self):
return client.GoogleCredentials(None, None, None, None,
None, None, None, None)
def test_get_application_default_in_gae(self, gae_adc, in_gae,
from_gce, from_files):
credentials = client.GoogleCredentials.get_application_default()
self.assertEqual(credentials, gae_adc.return_value)
from_files.assert_called_once_with()
in_gae.assert_called_once_with()
from_gce.assert_not_called()
def test_get_application_default_in_gce(self, gce_adc, in_gce,
from_files, from_gae):
credentials = client.GoogleCredentials.get_application_default()
self.assertEqual(credentials, gce_adc.return_value)
in_gce.assert_called_once_with()
from_gae.assert_called_once_with()
from_files.assert_called_once_with()
def test_get_adc_from_env_var_service_account(self, *stubs):
# Set up stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
get_env_file.return_value = datafile(
os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
credentials = client.GoogleCredentials.get_application_default()
self.validate_service_account_credentials(credentials)
get_env_file.assert_called_once_with()
get_well_known.assert_not_called()
in_gae.assert_not_called()
in_gce.assert_not_called()
def test_get_adc_env_not_set_up(self, *stubs):
# Unpack stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
# Make sure the well-known file actually doesn't exist.
self.assertFalse(os.path.exists(get_well_known.return_value))
expected_err = client.ApplicationDefaultCredentialsError
with self.assertRaises(expected_err) as exc_manager:
client.GoogleCredentials.get_application_default()
self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception))
get_env_file.assert_called_once_with()
get_well_known.assert_called_once_with()
in_gae.assert_called_once_with()
in_gce.assert_called_once_with()
def test_to_from_json_authorized_user(self):
filename = 'application_default_credentials_authorized_user.json'
credentials_file = datafile(os.path.join('gcloud', filename))
creds = client.GoogleCredentials.from_stream(credentials_file)
json = creds.to_json()
creds2 = client.GoogleCredentials.from_json(json)
self.assertEqual(creds.__dict__, creds2.__dict__)
def test_to_from_json_service_account(self):
credentials_file = datafile(
os.path.join('gcloud', client._WELL_KNOWN_CREDENTIALS_FILE))
creds1 = client.GoogleCredentials.from_stream(credentials_file)
# Convert to and then back from json.
creds2 = client.GoogleCredentials.from_json(creds1.to_json())
creds1_vals = creds1.__dict__
creds1_vals.pop('_signer')
creds2_vals = creds2.__dict__
creds2_vals.pop('_signer')
self.assertEqual(creds1_vals, creds2_vals)