def _get_credentials(self, config):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
store = oauth2client.contrib.dictionary_storage.DictionaryStorage(config, 'oauth2')
credentials = store.get()
if not credentials or credentials.invalid:
print("Ask credentials for " + config['user_id'])
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--logging_level', default='ERROR')
parser.add_argument('--noauth_local_webserver', action='store_true',
default=True, help='Do not run a local web server.')
args = parser.parse_args([])
credentials = tools.run_flow(flow, store, args)
config.save()
return credentials
python类run_flow()的实例源码
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_path = CREDS_FILENAME
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
credentials = tools.run_flow(flow, store)
return credentials
def get_credentials(flags):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
store = Storage(flags.credfile)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL)
credentials = tools.run_flow(flow, store, flags)
print('credential file saved at\n\t' + flags.credfile)
return credentials
def test_run_flow_webserver(
self, webbrowser_open_mock, server_ctor_mock, logging_mock):
server_ctor_mock.return_value = self.server
self.server.query_params = {'code': 'auth_code'}
# Successful exchange.
returned_credentials = tools.run_flow(
self.flow, self.storage, flags=self.server_flags)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, 'http://localhost:8080/')
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
self.storage.put.assert_called_once_with(self.credentials)
self.credentials.set_store.assert_called_once_with(self.storage)
self.assertTrue(self.server.handle_request.called)
webbrowser_open_mock.assert_called_once_with(
'http://example.com/auth', autoraise=True, new=1)
def test_run_flow_webserver_fallback(
self, input_mock, server_ctor_mock, logging_mock):
server_ctor_mock.side_effect = socket.error()
input_mock.return_value = 'auth_code'
# It should catch the socket error and proceed as if
# noauth_local_webserver was specified.
returned_credentials = tools.run_flow(
self.flow, self.storage, flags=self.server_flags)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
self.assertTrue(server_ctor_mock.called)
self.assertFalse(self.server.handle_request.called)
def auth_stored_credentials(self, client_secrets_file, scopes=[]):
"""Authorize stored credentials."""
try:
import argparse
parser = argparse.ArgumentParser(parents=[tools.argparser])
parser.add_argument('args', nargs=argparse.REMAINDER)
flags = parser.parse_args()
flags.noauth_local_webserver = True
except ImportError:
flags = None
store = Storage(self.credentials_file)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(
client_secrets_file,
scopes,
)
flow.user_agent = self.app_name
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print 'Saved credentials to ' + self.credentials_file
return credentials
def authorize_application(client_secret_file, scope, credential_cache_file='credentials_cache.json', flow_params=[]):
"""
authorize an application to the requested scope by asking the user in a browser.
:param client_secret_file: json file containing the client secret for an offline application
:param scope: scope(s) to authorize the application for
:param credential_cache_file: if provided or not None, the credenials will be cached in a file.
The user does not need to be reauthenticated
:param flow_params: oauth2 flow parameters
:return OAuth2Credentials object
"""
FLOW = flow_from_clientsecrets(client_secret_file,
scope=scope)
storage = Storage(credential_cache_file)
credentials = storage.get()
if credentials is None or credentials.invalid:
# Run oauth2 flow with default arguments.
level = logging.getLogger().level
credentials = tools.run_flow(FLOW, storage, tools.argparser.parse_args(flow_params))
logging.getLogger().setLevel(level)
return credentials
def test_run_flow_webserver(
self, webbrowser_open_mock, server_ctor_mock, logging_mock):
server_ctor_mock.return_value = self.server
self.server.query_params = {'code': 'auth_code'}
# Successful exchange.
returned_credentials = tools.run_flow(
self.flow, self.storage, flags=self.server_flags)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, 'http://localhost:8080/')
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
self.storage.put.assert_called_once_with(self.credentials)
self.credentials.set_store.assert_called_once_with(self.storage)
self.assertTrue(self.server.handle_request.called)
webbrowser_open_mock.assert_called_once_with(
'http://example.com/auth', autoraise=True, new=1)
def test_run_flow_webserver_fallback(
self, input_mock, server_ctor_mock, logging_mock):
server_ctor_mock.side_effect = socket.error()
input_mock.return_value = 'auth_code'
# It should catch the socket error and proceed as if
# noauth_local_webserver was specified.
returned_credentials = tools.run_flow(
self.flow, self.storage, flags=self.server_flags)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
self.assertTrue(server_ctor_mock.called)
self.assertFalse(self.server.handle_request.called)
def new_credentials():
"""Perform OAuth2 flow to obtain the new credentials.
Return:
Credentials, the obtained credential.
"""
credential_path = os.path.join(args.creds_dir, 'ct_gdrive_creds.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(args.client_secret, SCOPES)
flow.user_agent = APPLICATION_NAME
credentials = tools.run_flow(flow, store, args)
print('Storing credentials to ' + credential_path)
return credentials
def get_drive_service():
'''
Returns an object used to interact with the Google Drive API.
'''
flow = client.flow_from_clientsecrets(
get_credentials_path('secret.json'),
'https://www.googleapis.com/auth/drive')
flow.user_agent = USER_AGENT_NAME
store = Storage(get_credentials_path('storage.dat', False))
credentials = store.get()
if not credentials or credentials.invalid:
flags = tools.argparser.parse_args(args=[])
credentials = tools.run_flow(flow, store, flags)
http = credentials.authorize(httplib2.Http())
service = discovery.build('drive', 'v3', http=http)
return service
def get_credentials():
"""
credentials?????????
"""
dirname = os.path.dirname(__file__)
credential_path = os.path.join(dirname, CREDENTIAL_FILE)
client_secret_file = os.path.join(dirname, CLIENT_SECRET_FILE)
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(client_secret_file, SCOPES)
flow.user_agent = APPLICATION_NAME
credentials = tools.run_flow(flow, store)
print('credentials?{}???????'.format(credential_path))
return credentials
gbq.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_user_account_credentials(self):
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.file import Storage
from oauth2client.tools import run_flow, argparser
flow = OAuth2WebServerFlow(
client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd'
'.apps.googleusercontent.com'),
client_secret='kOc9wMptUtxkcIFbtZCcrEAc',
scope=self.scope,
redirect_uri='urn:ietf:wg:oauth:2.0:oob')
storage = Storage('bigquery_credentials.dat')
credentials = storage.get()
if credentials is None or credentials.invalid or self.reauth:
credentials = run_flow(flow, storage, argparser.parse_args([]))
return credentials
def initYoutube():
flow = flow_from_clientsecrets(constants.YOUTUBE_CLIENT_SECRETS_FILE,\
message=constants.MISSING_CLIENT_SECRETS_MESSAGE,\
scope=constants.YOUTUBE_READ_WRITE_SCOPE,
redirect_uri='http://localhost:8080/')
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
flags = argparser.parse_args()
credentials = run_flow(flow, storage, flags)
else:
print("Found valid oauth2 json")
youtube = build(constants.YOUTUBE_API_SERVICE_NAME, constants.YOUTUBE_API_VERSION,
http=credentials.authorize(httplib2.Http()))
print("Done authentication with Youtube")
return youtube
def get_credentials(self):
conf_dir = self.get_conf_dir()
credential_path = os.path.join(conf_dir, self.event + '_credentials.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
log.warn("No current valid Google credentials. Starting authentication flow...")
flow = client.flow_from_clientsecrets(os.path.join(conf_dir, 'client_secret.json'),
'https://www.googleapis.com/auth/calendar')
flow.user_agent = "HOTBot"
if self.flags:
credentials = tools.run_flow(flow, store, self.flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
log.info('Storing credentials to ' + credential_path)
return credentials
def update_credentials(self):
"""Gets valid user credentials from Gmail using Oauth 2.0
Returns:
Credentials, the obtained credential.
"""
if not os.path.exists(self._config['credentials_dir']):
os.makedirs(self._config['credentials_dir'])
store = Storage(self._credentials_file)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(self._client_secret_file,
self._config['oauth2_scopes'])
flow.user_agent = self._config['application_name']
credentials = tools.run_flow(flow, store, self._oauth2_flags)
self._logger.info('Storing credentials to ' + self._credentials_file)
else:
self._logger.info("Credentials exist.")
return credentials
def _get_credentials(self):
"""Get OAuth credentials
:return: OAuth credentials
:rtype: :class:`oauth2client.client.Credentials`
"""
credential_dir = join(self.config['creds_cache_dir'], 'cached_oauth_credentials')
if not exists(credential_dir):
makedirs(credential_dir)
credential_path = join(credential_dir, 'googleapis.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(self.config['creds'],
self.config['scope'])
flow.user_agent = 'Iris Gmail Integration'
credentials = tools.run_flow(
flow, store, tools.argparser.parse_args(args=['--noauth_local_webserver']))
logger.info('Storing credentials to %s' % credential_path)
else:
credentials.refresh(self.http)
return credentials
def get_credentials(self, client_secret_file, scopes, credentials_dir, credentials_file, flags=None):
# Create credentials folder, if necessary
if not os.path.exists(credentials_dir):
os.makedirs(credentials_dir)
# Store for credentials file
credential_path = os.path.join(credentials_dir, credentials_file)
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(client_secret_file, scopes)
flow.user_agent = APPLICATION_NAME
if self._flags:
credentials = tools.run_flow(flow, store, self._flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
self._logger.debug("Storing credentials to '{}'".format(credential_path))
else:
self._logger.debug("Got valid credential from '{}'".format(credential_path))
return credentials
# Create a Google Calendar API service object
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
credential_path = os.path.join(SAVED_CREDENTIALS)
print(credential_path)
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
print("checking for cached credentials")
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'mycroft-gmail-skill.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store)
print 'Storing credentials to ' + credential_dir
print 'Your GMail Skill is now authenticated '
else:
print 'Loaded credentials for Gmail Skill from ' + credential_dir
return credentials
def get_credentials():
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
print("checking for cached credentials")
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'mycroft-gmail-skill.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store)
print 'Storing credentials to ' + credential_dir
print 'Your GMail Skill is now authenticated '
else:
print 'Loaded credentials for Gmail Skill from ' + credential_dir
return credentials
def get_credentials():
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
print("checking for cached credentials")
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'mycroft-gmail-skill.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store)
print 'Storing credentials to ' + credential_dir
print 'Your GMail Skill is now authenticated '
else:
print 'Loaded credentials for Gmail Skill from ' + credential_dir
return credentials
obtain_user_auth.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def main():
flow = client.flow_from_clientsecrets(CLIENT_SECRETS_PATH, SCOPES)
print('Starting credentials flow...')
credentials = tools.run_flow(flow, NullStorage())
# Save the credentials in the same format as the Cloud SDK's authorized
# user file.
data = {
'type': 'authorized_user',
'client_id': flow.client_id,
'client_secret': flow.client_secret,
'refresh_token': credentials.refresh_token
}
with open(AUTHORIZED_USER_PATH, 'w') as fh:
json.dump(data, fh, indent=4)
print('Created {}.'.format(AUTHORIZED_USER_PATH))
def _get_credentials(self):
"""Get OAuth credentials
:return: OAuth credentials
:rtype: :class:`oauth2client.client.Credentials`
"""
credential_dir = join(self.var_dir, 'cached_oauth_credentials')
if not exists(credential_dir):
makedirs(credential_dir)
credential_path = join(credential_dir, 'googleapis.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(self.config['creds'],
self.config['scope'])
flow.user_agent = 'Iris Gmail Integration'
credentials = tools.run_flow(
flow,
store,
tools.argparser.parse_args(args=['--noauth_local_webserver']))
logger.info('Storing credentials to %s', credential_path)
else:
credentials.refresh(self.http)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
import argparse
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
flags.noauth_local_webserver = True
store = Storage(gmail_api.CREDENTIAL_FILE)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(gmail_api.CLIENT_SECRET_FILE, gmail_api.SCOPES)
flow.user_agent = gmail_api.APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
print 'Storing credentials to ' + gmail_api.CREDENTIAL_FILE
return credentials
def get_credentials():
home_dir = path.expanduser('~')
credential_dir = path.join(home_dir, '.credentials')
if not path.exists(credential_dir):
makedirs(credential_dir)
credential_path = path.join(credential_dir, 'sheets.googleapis.com-python-quickstart.json')
store = file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(client_secret_path, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials(self, cred_file='client_secret.json'):
import os
from oauth2client.file import Storage
from oauth2client import client
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir, 'python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(cred_file, scope=['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive'])
flow.user_agent = self.app_name
try:
credentials = tools.run_flow(flow, store, self.flags)
except: # Needed only for compatibility with Python 2.6
try:
credentials = tools.run_flow(flow, store)
except:
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
print("checking for cached credentials")
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'mycroft-googlecalendar-skill.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store)
print 'Storing credentials to ' + credential_dir
print 'Your Google Calendar Skill is now authenticated '
else:
print 'Loaded credentials for Google Calendar Skill from ' + credential_dir
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
credential_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'drive-to-photos.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print 'Storing credentials to ' + credential_path
return credentials
def get_credentials():
# taken from https://developers.google.com/google-apps/calendar/quickstart/python
global credentials
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
print('Creating', credential_dir)
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir, 'pi_remind.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to', credential_path)
return credentials