def getSvcAcctCredentials(scopes, act_as):
try:
if not GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]:
json_string = readFile(GC.Values[GC.OAUTH2SERVICE_JSON], continueOnError=True, displayError=True)
if not json_string:
invalidOauth2serviceJsonExit()
GM.Globals[GM.OAUTH2SERVICE_JSON_DATA] = json.loads(json_string)
credentials = oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_dict(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA], scopes)
credentials = credentials.create_delegated(act_as)
credentials.user_agent = GAM_INFO
serialization_data = credentials.serialization_data
GM.Globals[GM.ADMIN] = serialization_data[u'client_email']
GM.Globals[GM.OAUTH2_CLIENT_ID] = serialization_data[u'client_id']
return credentials
except (ValueError, IndexError, KeyError):
invalidOauth2serviceJsonExit()
python类service_account()的实例源码
def getSvcAcctCredentials(scopes, act_as):
try:
if not GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]:
json_string = readFile(GC.Values[GC.OAUTH2SERVICE_JSON], continueOnError=True, displayError=True)
if not json_string:
invalidOauth2serviceJsonExit()
GM.Globals[GM.OAUTH2SERVICE_JSON_DATA] = json.loads(json_string)
credentials = oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_dict(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA], scopes)
credentials = credentials.create_delegated(act_as)
credentials.user_agent = GAM_INFO
serialization_data = credentials.serialization_data
GM.Globals[GM.ADMIN] = serialization_data[u'client_email']
GM.Globals[GM.OAUTH2_CLIENT_ID] = serialization_data[u'client_id']
return credentials
except (ValueError, IndexError, KeyError):
invalidOauth2serviceJsonExit()
def _ProcessJsonArg(args):
"""Get client_info and service_account_json_keyfile from args.
This just reads args.json, and decides (based on contents) whether
it's a client_secrets or a service_account key, and returns as
appropriate.
"""
filename = os.path.expanduser(args.json)
if not filename:
return '', ''
with open(filename, 'rU') as f:
try:
contents = json.load(f)
except ValueError:
raise ValueError('Invalid JSON file: {}'.format(args.json))
if contents.get('type', '') == 'service_account':
return '', filename
else:
return filename, ''
def checkServiceAccount(users):
checkForExtraneousArguments()
all_scopes_pass = True
all_scopes, jcount = API.getSortedSvcAcctScopesList()
i, count, users = getEntityArgument(users)
for user in users:
i += 1
user = convertUIDtoEmailAddress(user)
entityPerformActionNumItems([Ent.USER, user], jcount, Ent.SCOPE, i, count)
Ind.Increment()
j = 0
for scope in all_scopes:
j += 1
try:
credentials = getSvcAcctCredentials(scope, user)
credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL]))
result = u'PASS'
except httplib2.ServerNotFoundError as e:
systemErrorExit(NETWORK_ERROR_RC, str(e))
except oauth2client.client.HttpAccessTokenRefreshError:
result = u'FAIL'
all_scopes_pass = False
entityActionPerformedMessage([Ent.SCOPE, u'{0:60}'.format(scope)], result, j, jcount)
Ind.Decrement()
service_account = credentials.serialization_data[u'client_id']
_, _, user_domain = splitEmailAddressOrUID(user)
printBlankLine()
if all_scopes_pass:
printLine(Msg.SCOPE_AUTHORIZATION_PASSED.format(service_account))
else:
printErrorMessage(SCOPES_NOT_AUTHORIZED, Msg.SCOPE_AUTHORIZATION_FAILED.format(user_domain, service_account, u',\n'.join(all_scopes)))
def checkServiceAccount(users):
checkForExtraneousArguments()
all_scopes_pass = True
all_scopes, jcount = API.getSortedSvcAcctScopesList()
i, count, users = getEntityArgument(users)
for user in users:
i += 1
user = convertUIDtoEmailAddress(user)
entityPerformActionNumItems([Ent.USER, user], jcount, Ent.SCOPE, i, count)
Ind.Increment()
j = 0
for scope in all_scopes:
j += 1
try:
credentials = getSvcAcctCredentials(scope, user)
credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL]))
result = u'PASS'
except httplib2.ServerNotFoundError as e:
systemErrorExit(NETWORK_ERROR_RC, str(e))
except oauth2client.client.HttpAccessTokenRefreshError:
result = u'FAIL'
all_scopes_pass = False
entityActionPerformedMessage([Ent.SCOPE, u'{0:60}'.format(scope)], result, j, jcount)
Ind.Decrement()
service_account = credentials.serialization_data[u'client_id']
_, _, user_domain = splitEmailAddressOrUID(user)
printBlankLine()
if all_scopes_pass:
printLine(Msg.SCOPE_AUTHORIZATION_PASSED.format(service_account))
else:
printErrorMessage(SCOPES_NOT_AUTHORIZED, Msg.SCOPE_AUTHORIZATION_FAILED.format(user_domain, service_account, u',\n'.join(all_scopes)))
def _GetCredentialForServiceAccount(json_keyfile, scopes,
credentials_filename=None):
with open(json_keyfile, 'r') as json_keyfile_obj:
client_credentials = json.load(json_keyfile_obj)
credential_store = _GetCredentialStore(credentials_filename,
client_credentials['private_key_id'],
' '.join(sorted(scopes)))
credentials = credential_store.get()
if credentials is None or credentials.invalid:
credentials = (
service_account.ServiceAccountCredentials.from_json_keyfile_dict(
client_credentials, scopes=scopes))
credential_store.put(credentials)
credentials.set_store(credential_store)
return credentials