def add_login_begin(self):
flow = self._flow = client.flow_from_clientsecrets('googledrive.json',
'https://www.googleapis.com/auth/drive.file')
flow.user_agent = const.APP_USERAGENT
flow.redirect_uri = client.OOB_CALLBACK_URN
authorize_url = flow.step1_get_authorize_url()
return {'Login URL': authorize_url}
python类flow_from_clientsecrets()的实例源码
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'sendEmail.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'admin-reports_v1-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'drive-python-quickstart.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
MyYoutubeLikedMusicVideosDownloader.py 文件源码
项目:FunUtils
作者: HoussemCharf
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_authenticated_service(args):
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("youtube-api-snippets-oauth2.json")
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage, args)
# Trusted testers can download this discovery document from the developers page
# and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def create_credentials(client_secret_path):
"""Gets valid user credentials from storage.
Args:
path (str), path to client_secret.json file
"""
flow = client.flow_from_clientsecrets(client_secret_path, ' '.join(SCOPES))
flow.user_agent = 'Clouseau'
flow.params['access_type'] = 'offline'
flow.params['approval_prompt'] = 'force'
store = oauth2client.file.Storage(CREDENTIALS_PATH)
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args(['--noauth_local_webserver'])
tools.run_flow(flow, store, flags)
def get_credentials():
"""
Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'quick-add.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'drive-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'academiadasapostas.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'sheets.googleapis.com-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def test_flow_get_put(self):
instance = TestNDBModel(
flow=client.flow_from_clientsecrets(
datafile('client_secrets.json'), 'foo', redirect_uri='oob'),
id='foo'
)
instance.put()
retrieved = TestNDBModel.get_by_id('foo')
self.assertEqual('foo_client_id', retrieved.flow.client_id)
def test_validate_success(self, mock_logger):
flow_prop = TestNDBModel.flow
flow_val = client.flow_from_clientsecrets(
datafile('client_secrets.json'), 'foo', redirect_uri='oob')
flow_prop._validate(flow_val)
mock_logger.info.assert_called_once_with('validate: Got type %s',
type(flow_val))
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()
self.flow = client.flow_from_clientsecrets(
datafile('client_secrets.json'),
'foo',
redirect_uri='oob')
def test_flow_from_clientsecrets_cached(self):
cache_mock = http_mock.CacheMock()
load_and_cache('client_secrets.json', 'some_secrets', cache_mock)
flow = client.flow_from_clientsecrets(
'some_secrets', '', redirect_uri='oob', cache=cache_mock)
self.assertEqual('foo_client_secret', flow.client_secret)
def _flow_from_clientsecrets_success_helper(self, loadfile_mock,
device_uri=None,
revoke_uri=None):
client_type = clientsecrets.TYPE_WEB
client_info = {
'auth_uri': 'auth_uri',
'token_uri': 'token_uri',
'client_id': 'client_id',
'client_secret': 'client_secret',
}
if revoke_uri is not None:
client_info['revoke_uri'] = revoke_uri
loadfile_mock.return_value = client_type, client_info
filename = object()
scope = ['baz']
cache = object()
if device_uri is not None:
result = client.flow_from_clientsecrets(
filename, scope, cache=cache, device_uri=device_uri)
self.assertEqual(result.device_uri, device_uri)
else:
result = client.flow_from_clientsecrets(
filename, scope, cache=cache)
self.assertIsInstance(result, client.OAuth2WebServerFlow)
loadfile_mock.assert_called_once_with(filename, cache=cache)
def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit,
loadfile_mock):
filename = object()
cache = object()
message = 'hi mom'
client.flow_from_clientsecrets(
filename, None, cache=cache, message=message)
sys_exit.assert_called_once_with(message)
loadfile_mock.assert_called_once_with(filename, cache=cache)
def test_flow_from_clientsecrets_invalid_w_msg_and_text(self, sys_exit,
loadfile_mock):
filename = object()
cache = object()
message = 'hi mom'
expected = ('The client secrets were invalid: '
'\n{0}\n{1}'.format('foobar', 'hi mom'))
client.flow_from_clientsecrets(
filename, None, cache=cache, message=message)
sys_exit.assert_called_once_with(expected)
loadfile_mock.assert_called_once_with(filename, cache=cache)
def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock):
client_type = 'UNKNOWN'
loadfile_mock.return_value = client_type, None
filename = object()
cache = object()
err_msg = ('This OAuth 2.0 flow is unsupported: '
'{0!r}'.format(client_type))
with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError,
err_msg):
client.flow_from_clientsecrets(filename, None, cache=cache)
loadfile_mock.assert_called_once_with(filename, cache=cache)
def create_and_store_flow(request):
flow = flow_from_clientsecrets(
settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON,
scope='https://www.googleapis.com/auth/drive.readonly',
redirect_uri=request.build_absolute_uri('/oauth')
)
flow.params['approval_prompt'] = 'force'
request.session['flow'] = jsonpickle.encode(flow)
return flow