def auth2(code):
flow = get_flow()
credentials = flow.step2_exchange(code)
print credentials.to_json()
storage = Storage('credentials_file')
storage.put(credentials)
python类Storage()的实例源码
def get_credentials():
storage = Storage('credentials_file')
credentials = storage.get()
if credentials.access_token_expired:
print 'Refreshing...'
credentials.refresh(httplib2.Http())
return credentials
def save(self, cred_file):
"""Save credentials to an external file."""
self.check_credentials()
storage = Storage(cred_file)
storage.put(self.credentials)
logger.info('Saved credentials to %s', cred_file)
def load(self, cred_file):
"""Load pre-saved credentials."""
storage = Storage(cred_file)
self.credentials = storage.get()
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'admin-reports_v1-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
MyYoutubeLikedMusicVideosDownloader.py 文件源码
项目:FunUtils
作者: HoussemCharf
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_authenticated_service(args):
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("youtube-api-snippets-oauth2.json")
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage, args)
# Trusted testers can download this discovery document from the developers page
# and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def GetGoogleClient(self):
"""Returns an authenticated google fit client object"""
logging.debug("Creating Google client")
credentials = Storage(self.googleCredsFile).get()
http = credentials.authorize(httplib2.Http())
client = build('fitness', 'v1', http=http)
logging.debug("Google client created")
return client
def GetDataSourceId(self, dataType):
"""Returns a data source id for Google Fit
dataType -- type of data. Possible options: steps, weight, heart_rate
"""
dataSource = self.GetDataSource(dataType)
projectNumber = Storage(self.googleCredsFile).get().client_id.split('-')[0]
return ':'.join((
dataSource['type'],
dataSource['dataType']['name'],
projectNumber,
dataSource['device']['manufacturer'],
dataSource['device']['model'],
dataSource['device']['uid']))
def get_credentials():
"""
Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'quick-add.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'drive-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,'academiadasapostas.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'sheets.googleapis.com-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def test_non_existent_file_storage(self, warn_mock):
storage = file_module.Storage(FILENAME)
credentials = storage.get()
warn_mock.assert_called_with(
_helpers._MISSING_FILE_MESSAGE.format(FILENAME))
self.assertIsNone(credentials)
def test_directory_file_storage(self):
storage = file_module.Storage(FILENAME)
os.mkdir(FILENAME)
try:
with self.assertRaises(IOError):
storage.get()
finally:
os.rmdir(FILENAME)
def test_no_sym_link_credentials(self):
SYMFILENAME = FILENAME + '.sym'
os.symlink(FILENAME, SYMFILENAME)
storage = file_module.Storage(SYMFILENAME)
try:
with self.assertRaises(IOError):
storage.get()
finally:
os.unlink(SYMFILENAME)
def test_token_refresh_store_expired(self):
expiration = (datetime.datetime.utcnow() -
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
response_content = json.dumps(token_response).encode('utf-8')
http = http_mock.HttpMock(data=response_content)
credentials._refresh(http)
self.assertEquals(credentials.access_token, access_token)
# Verify mocks.
self.assertEqual(http.requests, 1)
self.assertEqual(http.uri, credentials.token_uri)
self.assertEqual(http.method, 'POST')
expected_body = {
'grant_type': ['refresh_token'],
'client_id': [credentials.client_id],
'client_secret': [credentials.client_secret],
'refresh_token': [credentials.refresh_token],
}
self.assertEqual(urllib_parse.parse_qs(http.body), expected_body)
expected_headers = {
'content-type': 'application/x-www-form-urlencoded',
'user-agent': credentials.user_agent,
}
self.assertEqual(http.headers, expected_headers)
def test_token_refresh_good_store(self):
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
credentials._refresh(None)
self.assertEquals(credentials.access_token, 'bar')
def test_token_refresh_stream_body(self):
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
valid_access_token = '1/3w'
token_response = {'access_token': valid_access_token,
'expires_in': 3600}
http = http_mock.HttpMockSequence([
({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
({'status': http_client.OK}, 'echo_request_body')
])
body = six.StringIO('streaming body')
credentials.authorize(http)
_, content = transport.request(http, 'https://example.com', body=body)
self.assertEqual(content, 'streaming body')
self.assertEqual(credentials.access_token, valid_access_token)
def test_credentials_delete(self):
credentials = self._create_test_credentials()
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
self.assertIsNotNone(credentials)
storage.delete()
credentials = storage.get()
self.assertIsNone(credentials)