def get_credentials():
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
print("checking for cached credentials")
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'mycroft-googlecalendar-skill.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store)
print 'Storing credentials to ' + credential_dir
print 'Your Google Calendar Skill is now authenticated '
else:
print 'Loaded credentials for Google Calendar Skill from ' + credential_dir
return credentials
python类run_flow()的实例源码
def get_authenticated_service():
'''
Authorize the request and store authorization credentials.
'''
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage)
# Trusted testers can download this discovery document from the
# developers page and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def get_authenticated_service():
'''Authorize the request and store authorization credentials to YouTube'''
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage)
# Trusted testers can download this discovery document from the
# developers page and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def initClientSecret(self, path='client_secrets.json'):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
if not j.sal.fs.exists(path, followlinks=True):
raise j.exceptions.Input(message="Could not find google secrets file in %s, please dwonload" %
path, level=1, source="", tags="", msgpub="")
store = Storage(self.secretsFilePath)
self._credentials = store.get()
if not j.sal.fs.exists(self.secretsFilePath) or not self._credentials or self._credentials.invalid:
flow = client.flow_from_clientsecrets(path, SCOPES)
flow.user_agent = APPLICATION_NAME
self._credentials = tools.run_flow(flow, store)
# credentials = tools.run(flow, store)
self.logger.info('Storing credentials to ' + self.secretsFilePath)
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'sendEmail.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'admin-reports_v1-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'drive-python-quickstart.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
MyYoutubeLikedMusicVideosDownloader.py 文件源码
项目:FunUtils
作者: HoussemCharf
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def get_authenticated_service(args):
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("youtube-api-snippets-oauth2.json")
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage, args)
# Trusted testers can download this discovery document from the developers page
# and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def create_credentials(client_secret_path):
"""Gets valid user credentials from storage.
Args:
path (str), path to client_secret.json file
"""
flow = client.flow_from_clientsecrets(client_secret_path, ' '.join(SCOPES))
flow.user_agent = 'Clouseau'
flow.params['access_type'] = 'offline'
flow.params['approval_prompt'] = 'force'
store = oauth2client.file.Storage(CREDENTIALS_PATH)
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args(['--noauth_local_webserver'])
tools.run_flow(flow, store, flags)
def get_credentials():
"""
Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'quick-add.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'drive-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'academiadasapostas.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'sheets.googleapis.com-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def test_run_flow_no_webserver(self, input_mock, logging_mock):
input_mock.return_value = 'auth_code'
# Successful exchange.
returned_credentials = tools.run_flow(self.flow, self.storage)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
self.storage.put.assert_called_once_with(self.credentials)
self.credentials.set_store.assert_called_once_with(self.storage)
def test_run_flow_no_webserver_explicit_flags(
self, input_mock, logging_mock):
input_mock.return_value = 'auth_code'
# Successful exchange.
returned_credentials = tools.run_flow(
self.flow, self.storage, flags=self.flags)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
def test_run_flow_no_webserver_exchange_error(
self, input_mock, logging_mock):
input_mock.return_value = 'auth_code'
self.flow.step2_exchange.side_effect = client.FlowExchangeError()
# Error while exchanging.
with self.assertRaises(SystemExit):
tools.run_flow(self.flow, self.storage, flags=self.flags)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
def test_run_flow_webserver_exchange_error(
self, webbrowser_open_mock, server_ctor_mock, logging_mock):
server_ctor_mock.return_value = self.server
self.server.query_params = {'error': 'any error'}
# Exchange returned an error code.
with self.assertRaises(SystemExit):
tools.run_flow(self.flow, self.storage, flags=self.server_flags)
self.assertTrue(self.server.handle_request.called)
def test_run_flow_no_webserver(self, input_mock, logging_mock):
input_mock.return_value = 'auth_code'
# Successful exchange.
returned_credentials = tools.run_flow(self.flow, self.storage)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)
self.storage.put.assert_called_once_with(self.credentials)
self.credentials.set_store.assert_called_once_with(self.storage)
def test_run_flow_no_webserver_explicit_flags(
self, input_mock, logging_mock):
input_mock.return_value = 'auth_code'
# Successful exchange.
returned_credentials = tools.run_flow(
self.flow, self.storage, flags=self.flags)
self.assertEqual(self.credentials, returned_credentials)
self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
self.flow.step2_exchange.assert_called_once_with(
'auth_code', http=None)