python类Storage()的实例源码

cli.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def auth2(code):
    flow = get_flow()
    credentials = flow.step2_exchange(code)
    print credentials.to_json()
    storage = Storage('credentials_file')
    storage.put(credentials)
cli.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_credentials():
    storage = Storage('credentials_file')
    credentials = storage.get()
    if credentials.access_token_expired:
        print 'Refreshing...'
        credentials.refresh(httplib2.Http())
    return credentials
ggbackup.py 文件源码 项目:ggbackup 作者: lindegroup 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def save(self, cred_file):
        """Save credentials to an external file."""
        self.check_credentials()
        storage = Storage(cred_file)
        storage.put(self.credentials)
        logger.info('Saved credentials to %s', cred_file)
ggbackup.py 文件源码 项目:ggbackup 作者: lindegroup 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(self, cred_file):
        """Load pre-saved credentials."""
        storage = Storage(cred_file)
        self.credentials = storage.get()
quickstart.py 文件源码 项目:gsuite2mfe 作者: andywalden 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'admin-reports_v1-python-quickstart.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
MyYoutubeLikedMusicVideosDownloader.py 文件源码 项目:FunUtils 作者: HoussemCharf 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_authenticated_service(args):
  flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
    message=MISSING_CLIENT_SECRETS_MESSAGE)

  storage = Storage("youtube-api-snippets-oauth2.json")
  credentials = storage.get()

  if credentials is None or credentials.invalid:
    credentials = run_flow(flow, storage, args)

  # Trusted testers can download this discovery document from the developers page
  # and it should be in the same directory with the code.
  return build(API_SERVICE_NAME, API_VERSION,
      http=credentials.authorize(httplib2.Http()))
helpers.py 文件源码 项目:fitbit-googlefit 作者: praveendath92 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def GetGoogleClient(self):
        """Returns an authenticated google fit client object"""
        logging.debug("Creating Google client")
        credentials = Storage(self.googleCredsFile).get()
        http = credentials.authorize(httplib2.Http())
        client = build('fitness', 'v1', http=http)
        logging.debug("Google client created")
        return client
convertors.py 文件源码 项目:fitbit-googlefit 作者: praveendath92 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def GetDataSourceId(self, dataType):
        """Returns a data source id for Google Fit

        dataType -- type of data. Possible options: steps, weight, heart_rate
        """
        dataSource = self.GetDataSource(dataType)
        projectNumber = Storage(self.googleCredsFile).get().client_id.split('-')[0]
        return ':'.join((
            dataSource['type'],
            dataSource['dataType']['name'],
            projectNumber,
            dataSource['device']['manufacturer'],
            dataSource['device']['model'],
            dataSource['device']['uid']))
quick-add.py 文件源码 项目:handy-tools 作者: anuragbanerjee 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_credentials():
    """
    Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'quick-add.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
uploadToGDrive.py 文件源码 项目:meterOCR 作者: DBMSRmutl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'drive-python-quickstart.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
sheets.py 文件源码 项目:spiderBet 作者: amsimoes 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,'academiadasapostas.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
sheets_quickstart.py 文件源码 项目:spiderBet 作者: amsimoes 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'sheets.googleapis.com-python-quickstart.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_non_existent_file_storage(self, warn_mock):
        storage = file_module.Storage(FILENAME)
        credentials = storage.get()
        warn_mock.assert_called_with(
            _helpers._MISSING_FILE_MESSAGE.format(FILENAME))
        self.assertIsNone(credentials)
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_directory_file_storage(self):
        storage = file_module.Storage(FILENAME)
        os.mkdir(FILENAME)
        try:
            with self.assertRaises(IOError):
                storage.get()
        finally:
            os.rmdir(FILENAME)
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_no_sym_link_credentials(self):
        SYMFILENAME = FILENAME + '.sym'
        os.symlink(FILENAME, SYMFILENAME)
        storage = file_module.Storage(SYMFILENAME)
        try:
            with self.assertRaises(IOError):
                storage.get()
        finally:
            os.unlink(SYMFILENAME)
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_token_refresh_store_expired(self):
        expiration = (datetime.datetime.utcnow() -
                      datetime.timedelta(minutes=15))
        credentials = self._create_test_credentials(expiration=expiration)

        storage = file_module.Storage(FILENAME)
        storage.put(credentials)
        credentials = storage.get()
        new_cred = copy.copy(credentials)
        new_cred.access_token = 'bar'
        storage.put(new_cred)

        access_token = '1/3w'
        token_response = {'access_token': access_token, 'expires_in': 3600}
        response_content = json.dumps(token_response).encode('utf-8')
        http = http_mock.HttpMock(data=response_content)

        credentials._refresh(http)
        self.assertEquals(credentials.access_token, access_token)

        # Verify mocks.
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, credentials.token_uri)
        self.assertEqual(http.method, 'POST')
        expected_body = {
            'grant_type': ['refresh_token'],
            'client_id': [credentials.client_id],
            'client_secret': [credentials.client_secret],
            'refresh_token': [credentials.refresh_token],
        }
        self.assertEqual(urllib_parse.parse_qs(http.body), expected_body)
        expected_headers = {
            'content-type': 'application/x-www-form-urlencoded',
            'user-agent': credentials.user_agent,
        }
        self.assertEqual(http.headers, expected_headers)
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_token_refresh_good_store(self):
        expiration = (datetime.datetime.utcnow() +
                      datetime.timedelta(minutes=15))
        credentials = self._create_test_credentials(expiration=expiration)

        storage = file_module.Storage(FILENAME)
        storage.put(credentials)
        credentials = storage.get()
        new_cred = copy.copy(credentials)
        new_cred.access_token = 'bar'
        storage.put(new_cred)

        credentials._refresh(None)
        self.assertEquals(credentials.access_token, 'bar')
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_token_refresh_stream_body(self):
        expiration = (datetime.datetime.utcnow() +
                      datetime.timedelta(minutes=15))
        credentials = self._create_test_credentials(expiration=expiration)

        storage = file_module.Storage(FILENAME)
        storage.put(credentials)
        credentials = storage.get()
        new_cred = copy.copy(credentials)
        new_cred.access_token = 'bar'
        storage.put(new_cred)

        valid_access_token = '1/3w'
        token_response = {'access_token': valid_access_token,
                          'expires_in': 3600}
        http = http_mock.HttpMockSequence([
            ({'status': http_client.UNAUTHORIZED},
             b'Initial token expired'),
            ({'status': http_client.UNAUTHORIZED},
             b'Store token expired'),
            ({'status': http_client.OK},
             json.dumps(token_response).encode('utf-8')),
            ({'status': http_client.OK}, 'echo_request_body')
        ])

        body = six.StringIO('streaming body')

        credentials.authorize(http)
        _, content = transport.request(http, 'https://example.com', body=body)
        self.assertEqual(content, 'streaming body')
        self.assertEqual(credentials.access_token, valid_access_token)
test_file.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_credentials_delete(self):
        credentials = self._create_test_credentials()

        storage = file_module.Storage(FILENAME)
        storage.put(credentials)
        credentials = storage.get()
        self.assertIsNotNone(credentials)
        storage.delete()
        credentials = storage.get()
        self.assertIsNone(credentials)


问题


面经


文章

微信
公众号

扫码关注公众号