python类run_flow()的实例源码

authorize.py 文件源码 项目:GoogleCalendarSkill 作者: jcasoft 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_credentials():
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    print("checking for cached credentials")
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,'mycroft-googlecalendar-skill.json')

    store = oauth2client.file.Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store)
        print 'Storing credentials to ' + credential_dir
    print 'Your Google Calendar Skill is now authenticated '
    else:
    print 'Loaded credentials for Google Calendar Skill from ' + credential_dir
    return credentials
reddittube-del.py 文件源码 项目:reddittube 作者: armandg 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_authenticated_service():
    '''
    Authorize the request and store authorization credentials.
    '''
    flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
                                   scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
                                   message=MISSING_CLIENT_SECRETS_MESSAGE)

    storage = Storage("%s-oauth2.json" % sys.argv[0])
    credentials = storage.get()

    if credentials is None or credentials.invalid:
        credentials = run_flow(flow, storage)

    # Trusted testers can download this discovery document from the
    # developers page and it should be in the same directory with the code.
    return build(API_SERVICE_NAME, API_VERSION,
                 http=credentials.authorize(httplib2.Http()))
reddittube-add.py 文件源码 项目:reddittube 作者: armandg 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_authenticated_service():
    '''Authorize the request and store authorization credentials to YouTube'''
    flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
                                   scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
                                   message=MISSING_CLIENT_SECRETS_MESSAGE)

    storage = Storage("%s-oauth2.json" % sys.argv[0])
    credentials = storage.get()

    if credentials is None or credentials.invalid:
        credentials = run_flow(flow, storage)

    # Trusted testers can download this discovery document from the
    # developers page and it should be in the same directory with the code.
    return build(API_SERVICE_NAME, API_VERSION,
                 http=credentials.authorize(httplib2.Http()))
GDrive.py 文件源码 项目:lib9 作者: Jumpscale 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def initClientSecret(self, path='client_secrets.json'):
        """Gets valid user credentials from storage.

        If nothing has been stored, or if the stored credentials are invalid,
        the OAuth2 flow is completed to obtain the new credentials.

        Returns:
            Credentials, the obtained credential.
        """
        if not j.sal.fs.exists(path, followlinks=True):
            raise j.exceptions.Input(message="Could not find google secrets file in %s, please dwonload" %
                                     path, level=1, source="", tags="", msgpub="")
        store = Storage(self.secretsFilePath)
        self._credentials = store.get()
        if not j.sal.fs.exists(self.secretsFilePath) or not self._credentials or self._credentials.invalid:
            flow = client.flow_from_clientsecrets(path, SCOPES)
            flow.user_agent = APPLICATION_NAME
            self._credentials = tools.run_flow(flow, store)
            # credentials = tools.run(flow, store)
            self.logger.info('Storing credentials to ' + self.secretsFilePath)
gemail.py 文件源码 项目:cloud-site 作者: Mengjianhua-c 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'sendEmail.json')

    store = oauth2client.file.Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else:  # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
quickstart.py 文件源码 项目:gsuite2mfe 作者: andywalden 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'admin-reports_v1-python-quickstart.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
quickstart.py 文件源码 项目:yahoo-fantasy-football-metrics 作者: uberfastman 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'drive-python-quickstart.json')

    store = oauth2client.file.Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else:  # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
MyYoutubeLikedMusicVideosDownloader.py 文件源码 项目:FunUtils 作者: HoussemCharf 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_authenticated_service(args):
  flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
    message=MISSING_CLIENT_SECRETS_MESSAGE)

  storage = Storage("youtube-api-snippets-oauth2.json")
  credentials = storage.get()

  if credentials is None or credentials.invalid:
    credentials = run_flow(flow, storage, args)

  # Trusted testers can download this discovery document from the developers page
  # and it should be in the same directory with the code.
  return build(API_SERVICE_NAME, API_VERSION,
      http=credentials.authorize(httplib2.Http()))
gmail.py 文件源码 项目:libmozdata 作者: mozilla 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_credentials(client_secret_path):
    """Gets valid user credentials from storage.

    Args:
        path (str), path to client_secret.json file
    """
    flow = client.flow_from_clientsecrets(client_secret_path, ' '.join(SCOPES))
    flow.user_agent = 'Clouseau'
    flow.params['access_type'] = 'offline'
    flow.params['approval_prompt'] = 'force'
    store = oauth2client.file.Storage(CREDENTIALS_PATH)
    flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args(['--noauth_local_webserver'])

    tools.run_flow(flow, store, flags)
quick-add.py 文件源码 项目:handy-tools 作者: anuragbanerjee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_credentials():
    """
    Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'quick-add.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
uploadToGDrive.py 文件源码 项目:meterOCR 作者: DBMSRmutl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'drive-python-quickstart.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
sheets.py 文件源码 项目:spiderBet 作者: amsimoes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,'academiadasapostas.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
sheets_quickstart.py 文件源码 项目:spiderBet 作者: amsimoes 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_credentials():
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    home_dir = os.path.expanduser('~')
    credential_dir = os.path.join(home_dir, '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)
    credential_path = os.path.join(credential_dir,
                                   'sheets.googleapis.com-python-quickstart.json')

    store = Storage(credential_path)
    credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
        flow.user_agent = APPLICATION_NAME
        if flags:
            credentials = tools.run_flow(flow, store, flags)
        else: # Needed only for compatibility with Python 2.6
            credentials = tools.run(flow, store)
        print('Storing credentials to ' + credential_path)
    return credentials
test_tools.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_run_flow_no_webserver(self, input_mock, logging_mock):
        input_mock.return_value = 'auth_code'

        # Successful exchange.
        returned_credentials = tools.run_flow(self.flow, self.storage)

        self.assertEqual(self.credentials, returned_credentials)
        self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
        self.flow.step2_exchange.assert_called_once_with(
            'auth_code', http=None)
        self.storage.put.assert_called_once_with(self.credentials)
        self.credentials.set_store.assert_called_once_with(self.storage)
test_tools.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_run_flow_no_webserver_explicit_flags(
            self, input_mock, logging_mock):
        input_mock.return_value = 'auth_code'

        # Successful exchange.
        returned_credentials = tools.run_flow(
            self.flow, self.storage, flags=self.flags)

        self.assertEqual(self.credentials, returned_credentials)
        self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
        self.flow.step2_exchange.assert_called_once_with(
            'auth_code', http=None)
test_tools.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_run_flow_no_webserver_exchange_error(
            self, input_mock, logging_mock):
        input_mock.return_value = 'auth_code'
        self.flow.step2_exchange.side_effect = client.FlowExchangeError()

        # Error while exchanging.
        with self.assertRaises(SystemExit):
            tools.run_flow(self.flow, self.storage, flags=self.flags)

        self.flow.step2_exchange.assert_called_once_with(
            'auth_code', http=None)
test_tools.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_run_flow_webserver_exchange_error(
            self, webbrowser_open_mock, server_ctor_mock, logging_mock):
        server_ctor_mock.return_value = self.server
        self.server.query_params = {'error': 'any error'}

        # Exchange returned an error code.
        with self.assertRaises(SystemExit):
            tools.run_flow(self.flow, self.storage, flags=self.server_flags)

        self.assertTrue(self.server.handle_request.called)
test_tools.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_run_flow_no_webserver(self, input_mock, logging_mock):
        input_mock.return_value = 'auth_code'

        # Successful exchange.
        returned_credentials = tools.run_flow(self.flow, self.storage)

        self.assertEqual(self.credentials, returned_credentials)
        self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
        self.flow.step2_exchange.assert_called_once_with(
            'auth_code', http=None)
        self.storage.put.assert_called_once_with(self.credentials)
        self.credentials.set_store.assert_called_once_with(self.storage)
test_tools.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_run_flow_no_webserver_explicit_flags(
            self, input_mock, logging_mock):
        input_mock.return_value = 'auth_code'

        # Successful exchange.
        returned_credentials = tools.run_flow(
            self.flow, self.storage, flags=self.flags)

        self.assertEqual(self.credentials, returned_credentials)
        self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN)
        self.flow.step2_exchange.assert_called_once_with(
            'auth_code', http=None)


问题


面经


文章

微信
公众号

扫码关注公众号