def authcheck():
scope='https://www.googleapis.com/auth/userinfo.email'
try:
credentials = GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(scope)
except ApplicationDefaultCredentialsError:
return "Unable to acquire application default credentials"
http = httplib2.Http()
credentials.authorize(http)
service = build(serviceName='oauth2', version= 'v2',http=http)
resp = service.userinfo().get().execute()
return resp['email']
python类ApplicationDefaultCredentialsError()的实例源码
def _GetApplicationDefaultCredentials(scopes):
# There's a complication here: if the application default
# credential returned to us is the token from the cloud SDK,
# we need to ensure that our scopes are a subset of those
# requested. In principle, we're a bit too strict here: eg if
# a user requests just "bigquery", then the cloud-platform
# scope suffices, but there's no programmatic way to check
# this -- so we instead fail here.
try:
credentials = client.GoogleCredentials.get_application_default()
except client.ApplicationDefaultCredentialsError:
return None
if credentials is not None:
if not credentials.create_scoped_required():
return credentials
if set(scopes) <= _GCLOUD_SCOPES:
return credentials.create_scoped(scopes)
def test_get_environment_variable_file_error(self):
nonexistent_file = datafile('nonexistent')
os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file
expected_err_msg = (
'File {0} \(pointed by {1} environment variable\) does not '
'exist!'.format(
nonexistent_file, client.GOOGLE_APPLICATION_CREDENTIALS))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._get_environment_variable_file()
def test_get_application_default_credential_from_malformed_file_1(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_1.json'))
expected_err_msg = (
"'type' field should be defined \(and have one of the '{0}' or "
"'{1}' values\)".format(client.AUTHORIZED_USER,
client.SERVICE_ACCOUNT))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._get_application_default_credential_from_file(
credentials_file)
def test_get_application_default_credential_from_malformed_file_2(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_2.json'))
expected_err_msg = (
'The following field\(s\) must be defined: private_key_id')
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._get_application_default_credential_from_file(
credentials_file)
def test_raise_exception_for_missing_fields(self):
missing_fields = ['first', 'second', 'third']
expected_err_msg = ('The following field\(s\) must be defined: ' +
', '.join(missing_fields))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._raise_exception_for_missing_fields(missing_fields)
def test_raise_exception_for_reading_json(self):
credential_file = 'any_file'
extra_help = ' be good'
error = client.ApplicationDefaultCredentialsError('stuff happens')
expected_err_msg = ('An error was encountered while reading '
'json file: ' + credential_file +
extra_help + ': ' + str(error))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._raise_exception_for_reading_json(
credential_file, extra_help, error)
def test_get_adc_env_not_set_up(self, *stubs):
# Unpack stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
# Make sure the well-known file actually doesn't exist.
self.assertFalse(os.path.exists(get_well_known.return_value))
expected_err = client.ApplicationDefaultCredentialsError
with self.assertRaises(expected_err) as exc_manager:
client.GoogleCredentials.get_application_default()
self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception))
get_env_file.assert_called_once_with()
get_well_known.assert_called_once_with()
in_gae.assert_called_once_with()
in_gce.assert_called_once_with()
def test_from_stream_missing_file(self):
credentials_filename = None
expected_err_msg = (r'The parameter passed to the from_stream\(\) '
r'method should point to a file.')
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
self.get_a_google_credentials_object().from_stream(
credentials_filename)
def test_from_stream_malformed_file_1(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_1.json'))
expected_err_msg = (
'An error was encountered while reading json file: ' +
credentials_file +
' \(provided as parameter to the from_stream\(\) method\): ' +
"'type' field should be defined \(and have one of the '" +
client.AUTHORIZED_USER + "' or '" + client.SERVICE_ACCOUNT +
"' values\)")
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
self.get_a_google_credentials_object().from_stream(
credentials_file)
def test_from_stream_malformed_file_2(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_2.json'))
expected_err_msg = (
'An error was encountered while reading json file: ' +
credentials_file +
' \(provided as parameter to the from_stream\(\) method\): '
'The following field\(s\) must be defined: '
'private_key_id')
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
self.get_a_google_credentials_object().from_stream(
credentials_file)
def test_from_stream_malformed_file_3(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_3.json'))
with self.assertRaises(client.ApplicationDefaultCredentialsError):
self.get_a_google_credentials_object().from_stream(
credentials_file)
def test_get_environment_variable_file_error(self):
nonexistent_file = datafile('nonexistent')
os.environ[client.GOOGLE_APPLICATION_CREDENTIALS] = nonexistent_file
expected_err_msg = (
'File {0} \(pointed by {1} environment variable\) does not '
'exist!'.format(
nonexistent_file, client.GOOGLE_APPLICATION_CREDENTIALS))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._get_environment_variable_file()
def test_get_application_default_credential_from_malformed_file_1(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_1.json'))
expected_err_msg = (
"'type' field should be defined \(and have one of the '{0}' or "
"'{1}' values\)".format(client.AUTHORIZED_USER,
client.SERVICE_ACCOUNT))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._get_application_default_credential_from_file(
credentials_file)
def test_get_application_default_credential_from_malformed_file_2(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_2.json'))
expected_err_msg = (
'The following field\(s\) must be defined: private_key_id')
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._get_application_default_credential_from_file(
credentials_file)
def test_raise_exception_for_missing_fields(self):
missing_fields = ['first', 'second', 'third']
expected_err_msg = ('The following field\(s\) must be defined: ' +
', '.join(missing_fields))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._raise_exception_for_missing_fields(missing_fields)
def test_raise_exception_for_reading_json(self):
credential_file = 'any_file'
extra_help = ' be good'
error = client.ApplicationDefaultCredentialsError('stuff happens')
expected_err_msg = ('An error was encountered while reading '
'json file: ' + credential_file +
extra_help + ': ' + str(error))
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
client._raise_exception_for_reading_json(
credential_file, extra_help, error)
def test_get_adc_env_not_set_up(self, *stubs):
# Unpack stubs.
get_well_known, get_env_file, in_gae, in_gce = stubs
# Make sure the well-known file actually doesn't exist.
self.assertFalse(os.path.exists(get_well_known.return_value))
expected_err = client.ApplicationDefaultCredentialsError
with self.assertRaises(expected_err) as exc_manager:
client.GoogleCredentials.get_application_default()
self.assertEqual(client.ADC_HELP_MSG, str(exc_manager.exception))
get_env_file.assert_called_once_with()
get_well_known.assert_called_once_with()
in_gae.assert_called_once_with()
in_gce.assert_called_once_with()
def test_from_stream_missing_file(self):
credentials_filename = None
expected_err_msg = (r'The parameter passed to the from_stream\(\) '
r'method should point to a file.')
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
self.get_a_google_credentials_object().from_stream(
credentials_filename)
def test_from_stream_malformed_file_1(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_1.json'))
expected_err_msg = (
'An error was encountered while reading json file: ' +
credentials_file +
' \(provided as parameter to the from_stream\(\) method\): ' +
"'type' field should be defined \(and have one of the '" +
client.AUTHORIZED_USER + "' or '" + client.SERVICE_ACCOUNT +
"' values\)")
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
self.get_a_google_credentials_object().from_stream(
credentials_file)
def test_from_stream_malformed_file_2(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_2.json'))
expected_err_msg = (
'An error was encountered while reading json file: ' +
credentials_file +
' \(provided as parameter to the from_stream\(\) method\): '
'The following field\(s\) must be defined: '
'private_key_id')
with self.assertRaisesRegexp(client.ApplicationDefaultCredentialsError,
expected_err_msg):
self.get_a_google_credentials_object().from_stream(
credentials_file)
def test_from_stream_malformed_file_3(self):
credentials_file = datafile(
os.path.join('gcloud',
'application_default_credentials_malformed_3.json'))
with self.assertRaises(client.ApplicationDefaultCredentialsError):
self.get_a_google_credentials_object().from_stream(
credentials_file)
def Create(email_address=None, private_key_path=None, oauth_url=None):
if email_address:
from google.appengine.api.app_identity import app_identity_keybased_stub
logging.debug('Using the KeyBasedAppIdentityServiceStub.')
return app_identity_keybased_stub.KeyBasedAppIdentityServiceStub(
email_address=email_address,
private_key_path=private_key_path,
oauth_url=oauth_url)
elif sys.version_info >= (2, 6):
import six
if six._importer not in sys.meta_path:
sys.meta_path.append(six._importer)
from oauth2client import client
from google.appengine.api.app_identity import app_identity_defaultcredentialsbased_stub as ai_stub
try:
dc = ai_stub.DefaultCredentialsBasedAppIdentityServiceStub()
logging.debug('Successfully loaded Application Default Credentials.')
return dc
except client.ApplicationDefaultCredentialsError, error:
if not str(error).startswith('The Application Default Credentials '
'are not available.'):
logging.warning('An exception has been encountered when attempting '
'to use Application Default Credentials: %s'
'. Falling back on dummy AppIdentityServiceStub.',
str(error))
return AppIdentityServiceStub()
else:
logging.debug('Running under Python 2.5 uses dummy '
'AppIdentityServiceStub.')
return AppIdentityServiceStub()