python类Http()的实例源码

monitor.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def net_billings(self, username, now_bytes_total):
        global monitor_vnodes
        if not username in self.net_lastbillings.keys():
            self.net_lastbillings[username] = 0
        elif int(now_bytes_total/self.bytes_per_beans) < self.net_lastbillings[username]:
            self.net_lastbillings[username] = 0
        diff = int(now_bytes_total/self.bytes_per_beans) - self.net_lastbillings[username]
        if diff > 0:
            auth_key = env.getenv('AUTH_KEY')
            data = {"owner_name":username,"billing":diff, "auth_key":auth_key}
            header = {'Content-Type':'application/x-www-form-urlencoded'}
            http = Http()
            [resp,content] = http.request("http://"+self.master_ip+"/billing/beans/","POST",urlencode(data),headers = header)
            logger.info("response from master:"+content.decode('utf-8'))
        self.net_lastbillings[username] += diff
        monitor_vnodes[username]['net_stats']['net_billings'] = self.net_lastbillings[username]
kintaro.py 文件源码 项目:grow-ext-kintaro 作者: grow 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_service(self, host):
        credentials = oauth.get_or_create_credentials(
            scope=OAUTH_SCOPES, storage_key=STORAGE_KEY)
        http = httplib2.Http(ca_certs=utils.get_cacerts_path())
        http = credentials.authorize(http)
        # Kintaro's server doesn't seem to be able to refresh expired tokens
        # properly (responds with a "Stateless token expired" error). So we
        # manage state ourselves and refresh slightly more often than once
        # per hour.
        now = datetime.datetime.now()
        if self._last_run is None \
                or now - self._last_run >= datetime.timedelta(minutes=50):
            credentials.refresh(http)
            self._last_run = now
        url = DISCOVERY_URL.replace('{host}', host)
        return discovery.build('content', 'v1', http=http,
                               discoveryServiceUrl=url)
appengine.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _refresh(self, http_request):
        """Refreshes the access_token.

        Since the underlying App Engine app_identity implementation does its
        own caching we can skip all the storage hoops and just to a refresh
        using the API.

        Args:
            http_request: callable, a callable that matches the method
                          signature of httplib2.Http.request, used to make the
                          refresh request.

        Raises:
            AccessTokenRefreshError: When the refresh fails.
        """
        try:
            scopes = self.scope.split()
            (token, _) = app_identity.get_access_token(
                scopes, service_account_id=self.service_account_id)
        except app_identity.Error as e:
            raise AccessTokenRefreshError(str(e))
        self.access_token = token
IxChassisUtils.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def postWithPayload(loginUrl, payload=None):
        urlHeadersJson = {'content-type': 'application/json'}
        try:
            h = httplib2.Http('.cache',
                              disable_ssl_certificate_validation=True)
            if payload is None:
                logger.debug('POST: ' + loginUrl)
                (response, content) = h.request(loginUrl, 'POST', '',
                                                urlHeadersJson)
                logger.debug(content)
            else:
                logger.debug('POST: ' + loginUrl + ' <- Data: ' + str(payload))
                (response, content) = h.request(loginUrl, 'POST',
                                                body=payload,
                                                headers=urlHeadersJson)
                logger.debug(response)
                logger.debug(content)
        except Exception, e:
            raise Exception('Got an error code: ', e)
        return content
IxChassisUtils.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def postWithPayloadAndHeaders(loginUrl, urlHeadersJson,
                                  payload=None):
        try:
            h = httplib2.Http('.cache',
                              disable_ssl_certificate_validation=True)
            if payload is None:
                logger.debug('POST: ' + loginUrl)
                (response, content) = h.request(loginUrl, 'POST', '',
                                                urlHeadersJson)
            else:
                logger.debug('POST: ' + loginUrl + ' <- Data: ' + str(payload))
                (response, content) = h.request(loginUrl, 'POST',
                                                body=payload,
                                                headers=urlHeadersJson)
        except Exception, e:
            raise Exception('Got an error code: ', e)
        return content
IxChassisUtils.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def postOperation(url, apiKey, payload=''):
        urlHeadersJson = {'content-type': 'application/json',
                          'X-Api-Key': '%s' % str(apiKey)}
        try:
            h = httplib2.Http('.cache',
                              disable_ssl_certificate_validation=True)
            if payload is None:
                logger.debug('POST: ' + url)
                (response, content) = h.request(url, 'POST',
                                                json.dumps(payload),
                                                urlHeadersJson)
            else:
                logger.debug('POST: ' + url + ' <- Data: ' + str(payload))
                (response, content) = h.request(url, 'POST',
                                                json.dumps(payload),
                                                headers=urlHeadersJson)
        except Exception, e:
            raise Exception('Got an error code: ', e)
        return content
IxChassisUtils.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def patch(url, payload, apiKey):
        urlHeadersJson = {'content-type': 'application/json',
                          'X-Api-Key': '%s' % str(apiKey)}
        try:
            h = httplib2.Http('.cache',
                              disable_ssl_certificate_validation=True)
            logger.debug('PATCH: ' + url + ' <-- Attribute: ' +
                         str(payload))
            (response, content) = h.request(url, 'PATCH',
                                            json.dumps(payload),
                                            urlHeadersJson)
        except Exception, e:

            # print (response, content)

            raise Exception('Got an error code: ', e)
        return content
test_auth.py 文件源码 项目:cloudaux 作者: Netflix-Skunkworks 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test__googleauth(self):
        """
        TODO(supertom): add mocking, make more robust, etc.
        This test make a lot of assumptions:
        1. Running on GCE
        3. Doesn't truly verify the Http object is authorized.
        However, this function is critical for valid GCP operation
        so it is good to have a sanity check that we have an Http object.
        """
        from httplib2 import Http
        # default creds
        http_auth = auth._googleauth()
        self.assertTrue(isinstance(http_auth, Http))

        # service account key
        test_key_file = self._get_fixture('testkey.json')
        http_auth = auth._googleauth(key_file=test_key_file)
        self.assertTrue(isinstance(http_auth, Http))
appengine.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _refresh(self, http_request):
    """Refreshes the access_token.

    Since the underlying App Engine app_identity implementation does its own
    caching we can skip all the storage hoops and just to a refresh using the
    API.

    Args:
      http_request: callable, a callable that matches the method signature of
        httplib2.Http.request, used to make the refresh request.

    Raises:
      AccessTokenRefreshError: When the refresh fails.
    """
    try:
      scopes = self.scope.split()
      (token, _) = app_identity.get_access_token(scopes)
    except app_identity.Error, e:
      raise AccessTokenRefreshError(str(e))
    self.access_token = token
github_services.py 文件源码 项目:support-tools 作者: google 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, github_owner_username, github_repo_name,
               github_oauth_token, rate_limit, http_instance=None):
    """Initialize the GitHubService.

    Args:
      github_owner_username: The username of the owner of the repository.
      github_repo_name: The GitHub repository name.
      github_oauth_token: The oauth token to use for the requests.
      rate_limit: Whether or not to rate limit GitHub API requests.
      http_instance: The HTTP instance to use, if not set a default will be
          used.
    """
    self.github_owner_username = github_owner_username
    self.github_repo_name = github_repo_name
    self._github_oauth_token = github_oauth_token
    self._rate_limit = rate_limit
    self._http = http_instance if http_instance else httplib2.Http()
quickstart.py 文件源码 项目:gsuite2mfe 作者: andywalden 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main():
    """Shows basic usage of the Google Admin SDK Reports API.

    Creates a Google Admin SDK Reports API service object and outputs a list of
    last 10 login events.
    """
    credentials = get_credentials()
    http = credentials.authorize(httplib2.Http())
    service = discovery.build('admin', 'reports_v1', http=http)

    print('Getting the last 10 login events')
    results = service.activities().list(userKey='all', applicationName='login',
        maxResults=10).execute()
    activities = results.get('items', [])

    if not activities:
        print('No logins found.')
    else:
        print('Logins:')
        for activity in activities:
            print('{0}: {1} ({2})'.format(activity['id']['time'],
                activity['actor']['email'], activity['events'][0]['name']))
gsuite2mfe.py 文件源码 项目:gsuite2mfe 作者: andywalden 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def execute(self):
        """ 
        Returns GSuite events based on given app/activity.
        Other parameters are optional.
        """
        logging.debug("Authenticating to GSuite")
        self.get_credentials()
        self.http = self.credentials.authorize(httplib2.Http())
        self.service = discovery.build('admin', 'reports_v1', http=self.http)
        logging.debug("Retrieving %s events from: %s to %s", self.app, convert_time(self.s_time), convert_time(self.e_time))
        self.results = self.service.activities().list(userKey=self.user, 
                                             applicationName=self.app, 
                                             startTime=self.s_time,
                                             endTime=self.e_time,
                                             maxResults=self.max).execute()
        return self.results.get('items', [])
gam.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getGDataOAuthToken(gdataObj, credentials=None):
  if not credentials:
    credentials = getClientCredentials(API.FAM2_SCOPES)
  try:
    credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL]))
  except httplib2.ServerNotFoundError as e:
    systemErrorExit(NETWORK_ERROR_RC, str(e))
  except oauth2client.client.AccessTokenRefreshError as e:
    return handleOAuthTokenError(str(e), False)
  gdataObj.additional_headers[u'Authorization'] = u'Bearer {0}'.format(credentials.access_token)
  if not GC.Values[GC.DOMAIN]:
    GC.Values[GC.DOMAIN] = credentials.id_token.get(u'hd', u'UNKNOWN').lower()
  if not GC.Values[GC.CUSTOMER_ID]:
    GC.Values[GC.CUSTOMER_ID] = GC.MY_CUSTOMER
  GM.Globals[GM.ADMIN] = credentials.id_token.get(u'email', u'UNKNOWN').lower()
  GM.Globals[GM.OAUTH2_CLIENT_ID] = credentials.client_id
  gdataObj.domain = GC.Values[GC.DOMAIN]
  gdataObj.source = GAM_INFO
  return True
gam.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getGDataUserCredentials(api, user, i, count):
  userEmail = convertUIDtoEmailAddress(user)
  _, _, api_version, cred_family = API.getVersion(api)
  disc_file, discovery = readDiscoveryFile(api_version)
  GM.Globals[GM.CURRENT_API_USER] = userEmail
  credentials = getClientCredentials(cred_family)
  try:
    GM.Globals[GM.CURRENT_API_SCOPES] = list(set(list(discovery[u'auth'][u'oauth2'][u'scopes'])).intersection(credentials.scopes))
  except KeyError:
    invalidDiscoveryJsonExit(disc_file)
  if not GM.Globals[GM.CURRENT_API_SCOPES]:
    systemErrorExit(NO_SCOPES_FOR_API_RC, Msg.NO_SCOPES_FOR_API.format(discovery.get(u'title', api_version)))
  credentials = getSvcAcctCredentials(GM.Globals[GM.CURRENT_API_SCOPES], userEmail)
  try:
    credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL]))
    return (userEmail, credentials)
  except httplib2.ServerNotFoundError as e:
    systemErrorExit(NETWORK_ERROR_RC, str(e))
  except oauth2client.client.AccessTokenRefreshError as e:
    handleOAuthTokenError(str(e), True)
    entityUnknownWarning(Ent.USER, userEmail, i, count)
    return (userEmail, None)
gam.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getCRMService(login_hint):
  from oauth2client.contrib.dictionary_storage import DictionaryStorage
  scope = u'https://www.googleapis.com/auth/cloud-platform'
  client_id = u'297408095146-fug707qsjv4ikron0hugpevbrjhkmsk7.apps.googleusercontent.com'
  client_secret = u'qM3dP8f_4qedwzWQE1VR4zzU'
  flow = oauth2client.client.OAuth2WebServerFlow(client_id=client_id,
                                                 client_secret=client_secret, scope=scope, redirect_uri=oauth2client.client.OOB_CALLBACK_URN,
                                                 user_agent=GAM_INFO, access_type=u'online', response_type=u'code', login_hint=login_hint)
  storage_dict = {}
  storage = DictionaryStorage(storage_dict, u'credentials')
  flags = cmd_flags(noLocalWebserver=GC.Values[GC.NO_BROWSER])
  httpObj = httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL])
  try:
    credentials = oauth2client.tools.run_flow(flow=flow, storage=storage, flags=flags, http=httpObj)
  except httplib2.CertificateValidationUnsupported:
    noPythonSSLExit()
  credentials.user_agent = GAM_INFO
  httpObj = credentials.authorize(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL],
                                                cache=None))
  return (googleapiclient.discovery.build(u'cloudresourcemanager', u'v1', http=httpObj, cache_discovery=False), httpObj)
gam.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def doUpdateProject():
  login_hint = getEmailAddress(noUid=True, optional=True)
  checkForExtraneousArguments()
  login_hint = getValidateLoginHint(login_hint)
  _, httpObj = getCRMService(login_hint)
  cs_data = readFile(GC.Values[GC.CLIENT_SECRETS_JSON], mode=u'rb', continueOnError=True, displayError=True, encoding=None)
  if not cs_data:
    systemErrorExit(14, u'Your client secrets file:\n\n%s\n\nis missing. Please recreate the file.' % GC.Values[GC.CLIENT_SECRETS_JSON])
  try:
    cs_json = json.loads(cs_data)
    projectName = 'project:%s' % cs_json[u'installed'][u'project_id']
  except (ValueError, IndexError, KeyError):
    systemErrorExit(3, u'The format of your client secrets file:\n\n%s\n\nis incorrect. Please recreate the file.' % GC.Values[GC.CLIENT_SECRETS_JSON])
  simplehttp = httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL])
  enableProjectAPIs(simplehttp, httpObj, projectName, True)

# gam whatis <EmailItem> [noinfo]
http.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def build_http():
  """Builds httplib2.Http object

  Returns:
  A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
  To override default timeout call

    socket.setdefaulttimeout(timeout_in_sec)

  before interacting with this method.
  """
  if socket.getdefaulttimeout() is not None:
    http_timeout = socket.getdefaulttimeout()
  else:
    http_timeout = DEFAULT_HTTP_TIMEOUT_SEC
  return httplib2.Http(timeout=http_timeout)
test_ssl_context.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testHttpsContext(self):
        client = httplib2.Http(ca_certs=self.ca_certs_path)

        # Establish connection to local server
        client.request('https://localhost:%d/' % (self.port))

        # Verify that connection uses a TLS context with the correct hostname
        conn = client.connections['https:localhost:%d' % self.port]

        self.assertIsInstance(conn.sock, ssl.SSLSocket)
        self.assertTrue(hasattr(conn.sock, 'context'))
        self.assertIsInstance(conn.sock.context, ssl.SSLContext)
        self.assertTrue(conn.sock.context.check_hostname)
        self.assertEqual(conn.sock.server_hostname, 'localhost')
        self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED)
        self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
test_ssl_context.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_ssl_hostname_mismatch_repeat(self):
        # https://github.com/httplib2/httplib2/issues/5

        # FIXME(temoto): as of 2017-01-05 this is only a reference code, not useful test.
        # Because it doesn't provoke described error on my machine.
        # Instead `SSLContext.wrap_socket` raises `ssl.CertificateError`
        # which was also added to original patch.

        # url host is intentionally different, we provoke ssl hostname mismatch error
        url = 'https://127.0.0.1:%d/' % (self.port,)
        http = httplib2.Http(ca_certs=self.ca_certs_path, proxy_info=None)

        def once():
            try:
                http.request(url)
                assert False, 'expected certificate hostname mismatch error'
            except Exception as e:
                print('%s errno=%s' % (repr(e), getattr(e, 'errno', None)))

        once()
        once()
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testGetViaHttpsKeyCert(self):
        #  At this point I can only test
        #  that the key and cert files are passed in
        #  correctly to httplib. It would be nice to have
        #  a real https endpoint to test against.

        # bitworking.org presents an certificate for a non-matching host
        # (*.webfaction.com), so we need to disable cert checking for this test.
        http = httplib2.Http(timeout=2, disable_ssl_certificate_validation=True)

        http.add_certificate("akeyfile", "acertfile", "bitworking.org")
        try:
            (response, content) = http.request("https://bitworking.org", "GET")
        except:
            pass
        self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
        self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")

        try:
            (response, content) = http.request("https://notthere.bitworking.org", "GET")
        except:
            pass
        self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
        self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testGetViaHttpsKeyCert(self):
        #  At this point I can only test
        #  that the key and cert files are passed in
        #  correctly to httplib. It would be nice to have
        #  a real https endpoint to test against.
        http = httplib2.Http(timeout=2)

        http.add_certificate("akeyfile", "acertfile", "bitworking.org")
        try:
          (response, content) = http.request("https://bitworking.org", "GET")
        except AttributeError:
          self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
          self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
        except IOError:
          # Skip on 3.2
          pass

        try:
            (response, content) = http.request("https://notthere.bitworking.org", "GET")
        except httplib2.ServerNotFoundError:
          self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
          self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
        except IOError:
          # Skip on 3.2
          pass
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testSslCertValidation(self):
          # Test that we get an ssl.SSLError when specifying a non-existent CA
          # certs file.
          http = httplib2.Http(ca_certs='/nosuchfile')
          self.assertRaises(IOError,
                  http.request, "https://www.google.com/", "GET")

          # Test that we get a SSLHandshakeError if we try to access
          # https://www.google.com, using a CA cert file that doesn't contain
          # the CA Google uses (i.e., simulating a cert that's not signed by a
          # trusted CA).
          other_ca_certs = os.path.join(
                  os.path.dirname(os.path.abspath(httplib2.__file__ )),
                  "test", "other_cacerts.txt")
          http = httplib2.Http(ca_certs=other_ca_certs)
          self.assertRaises(ssl.SSLError,
            http.request,"https://www.google.com/", "GET")
quickstart.py 文件源码 项目:yahoo-fantasy-football-metrics 作者: uberfastman 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    """Shows basic usage of the Google Drive API.

    Creates a Google Drive API service object and outputs the names and IDs
    for up to 10 files.
    """
    credentials = get_credentials()
    http = credentials.authorize(httplib2.Http())
    service = discovery.build('drive', 'v3', http=http)

    results = service.files().list(
        pageSize=10, fields="nextPageToken, files(id, name)").execute()
    items = results.get('files', [])
    if not items:
        print('No files found.')
    else:
        print('Files:')
        for item in items:
            print('{0} ({1})'.format(item['name'], item['id']))
gmail.py 文件源码 项目:libmozdata 作者: mozilla 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
    """Send an email
    """
    subtype = 'html' if html else 'plain'
    message = MIMEMultipart()
    message['To'] = ', '.join(To)
    message['Subject'] = Subject
    message['Cc'] = ', '.join(Cc)
    message['Bcc'] = ', '.join(Bcc)

    message.attach(MIMEText(Body, subtype))

    for f in files:
        with open(f, "rb") as In:
            part = MIMEApplication(In.read(), Name=basename(f))
            part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
            message.attach(part)

    message = {'raw': base64.urlsafe_b64encode(message.as_string())}

    credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
    Http = credentials.authorize(httplib2.Http())
    service = discovery.build('gmail', 'v1', http=Http)

    message = service.users().messages().send(userId='me', body=message).execute()
appengine.py 文件源码 项目:office-interoperability-tools 作者: milossramek 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _refresh(self, http_request):
    """Refreshes the access_token.

    Since the underlying App Engine app_identity implementation does its own
    caching we can skip all the storage hoops and just to a refresh using the
    API.

    Args:
      http_request: callable, a callable that matches the method signature of
        httplib2.Http.request, used to make the refresh request.

    Raises:
      AccessTokenRefreshError: When the refresh fails.
    """
    try:
      (token, _) = app_identity.get_access_token(self.scope)
    except app_identity.Error, e:
      raise AccessTokenRefreshError(str(e))
    self.access_token = token
auth.py 文件源码 项目:professional-services 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def is_admin(self, credentials):
        """Check if user has appengine.admin role.

        Calls iam.projects.testIamPermissions with
        appengine.applications.update to determine if the current logged in
        user is an application admin.

        Args:
            credentials: the user's access token.

        Returns:
            True if user is an admin, False otherwise.
        """
        admin_permission = 'appengine.applications.update'
        body = {'permissions': admin_permission}
        http = credentials.authorize(httplib2.Http())
        response = api.CLIENTS.iam.projects().testIamPermissions(
            resource=config.get_project_id(), body=body).execute(http=http)
        return admin_permission in response.get('permissions', [])
quick-add.py 文件源码 项目:handy-tools 作者: anuragbanerjee 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main():
    """
    Shows basic usage of the Google Calendar API.

    Creates a Google Calendar API service object, logs the query from the arguments, and creates an event with Quick Add.
    """
    credentials = get_credentials()
    http = credentials.authorize(httplib2.Http())
    service = discovery.build('calendar', 'v3', http=http)

    if len(sys.argv) > 1:
        with open("log.txt", "a") as logfile:
            logfile.write(flags.query + "\n")
        created_event = service.events().quickAdd(
            calendarId='primary',
            text=flags.query
        ).execute()
project.py 文件源码 项目:Intelligent-Public-Grievance-System 作者: devyash 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def gdisconnect():
    # Only disconnect a connected user.
    credentials = login_session.get('credentials')
    if credentials is None:
        response = make_response(
            json.dumps('Current user not connected.'), 401)
        response.headers['Content-Type'] = 'application/json'
        return response
    access_token = credentials.access_token
    url = 'https://accounts.google.com/o/oauth2/revoke?token=%s' % access_token
    h = httplib2.Http()
    result = h.request(url, 'GET')[0]
    if result['status'] != '200':
        # For whatever reason, the given token was invalid.
        response = make_response(
            json.dumps('Failed to revoke token for given user.', 400))
        response.headers['Content-Type'] = 'application/json'
        return response


# Disconnect based on provider
boilercalendar.py 文件源码 项目:BoilerPlate 作者: wyaron 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def authenticate():
    global service
    global http_auth
    global calendar

    try:
        # read credentials
        credentials = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIAL_FILE_PATH, scopes)

    # authorize and get the calendar service
    http_auth = credentials.authorize(Http())
    service = discovery.build('calendar', 'v3', http=http_auth)
    calendar = service.calendars().get(calendarId=CALENDAR_ID).execute()
    except:
    logging.getLogger('BoilerLogger').error('failed to authenticate to google calendar service, will retry...')
    init()


# get calendar events in a window sorted by start time
service.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def delete(config, disk_name=None, disk_zone=None):  # TODO: implement
        # submit a request to the gce api for a new disk with the given parameters
        # if inputs is not None, run a pipeline job to populate the disk
        projectId = config.project_id
        zones = [disk_zone if disk_zone is not None else x for x in config.zones.split(',')]

        credentials = GoogleCredentials.get_application_default()
        http = credentials.authorize(httplib2.Http())

        if credentials.access_token_expired:
            credentials.refresh(http)

        gce = discovery.build('compute', 'v1', http=http)

        for z in zones:
            try:
                resp = gce.disks().delete(project=projectId, zone=z, disk=disk_name).execute()
            except HttpError as e:
                raise DataDiskError("Couldn't delete data disk {n}: {reason}".format(n=disk_name, reason=e))

            while True:
                try:
                    result = gce.zoneOperations().get(project=projectId, zone=z, operation=resp['name']).execute()
                except HttpError:
                    break
                else:
                    if result['status'] == 'DONE':
                        break


问题


面经


文章

微信
公众号

扫码关注公众号