python类UNAUTHORIZED的实例源码

twitter.py 文件源码 项目:TwitterCompetitionBot 作者: BenAllenUK 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def VerifyCredentials(self):
        '''Returns a twitter.User instance if the authenticating user is valid.

        Returns:
          A twitter.User instance representing that user if the
          credentials are valid, None otherwise.
        '''
        if not self._oauth_consumer:
            raise TwitterError("Api instance must first be given user credentials.")
        url = '%s/account/verify_credentials.json' % self.base_url
        try:
            json = self._FetchUrl(url, no_cache=True)
        except urllib2.HTTPError, http_error:
            if http_error.code == httplib.UNAUTHORIZED:
                return None
            else:
                raise http_error
        data = self._ParseAndCheckTwitter(json)
        return User.NewFromJsonDict(data)
twitter.py 文件源码 项目:gif-disco 作者: futurice 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def VerifyCredentials(self):
    '''Returns a twitter.User instance if the authenticating user is valid.

    Returns:
      A twitter.User instance representing that user if the
      credentials are valid, None otherwise.
    '''
    if not self._oauth_consumer:
      raise TwitterError("Api instance must first be given user credentials.")
    url = '%s/account/verify_credentials.json' % self.base_url
    try:
      json = self._FetchUrl(url, no_cache=True)
    except urllib2.HTTPError, http_error:
      if http_error.code == httplib.UNAUTHORIZED:
        return None
      else:
        raise http_error
    data = self._ParseAndCheckTwitter(json)
    return User.NewFromJsonDict(data)
__init__.py 文件源码 项目:autosub-bootstrapbill 作者: BenjV 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def VerifyCredentials(self):
    '''Returns a twitter.User instance if the authenticating user is valid.

    Returns:
      A twitter.User instance representing that user if the
      credentials are valid, None otherwise.
    '''
    if not self._oauth_consumer:
      raise TwitterError("Api instance must first be given user credentials.")
    url = '%s/account/verify_credentials.json' % self.base_url
    try:
      json = self._FetchUrl(url, no_cache=True)
    except urllib2.HTTPError, http_error:
      if http_error.code == httplib.UNAUTHORIZED:
        return None
      else:
        raise http_error
    data = self._ParseAndCheckTwitter(json)
    return User.NewFromJsonDict(data)
brightcove.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get(self, url, headers=None, can_retry=True):
        """
        Issue REST GET request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests.
            can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
        Returns:
            Response in python native data format.
        """
        headers_ = {'Authorization': 'Bearer ' + str(self.access_token)}
        if headers is not None:
            headers_.update(headers)
        resp = requests.get(url, headers=headers_)
        if resp.status_code == httplib.OK:
            return resp.json()
        elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
            self.access_token = self._refresh_access_token()
            return self.get(url, headers, can_retry=False)
        else:
            raise BrightcoveApiClientError
wistia.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def authenticate_api(self, **kwargs):
        """
        Call a sample Wistia API url to check on authentication success.

        Reference:
            https://wistia.com/doc/data-api#authentication

        Arguments:
            kwargs (dict): Wistia master token key-value pair.

        Returns:
            auth_data (dict): Master token, provided by a user, which is to be stored in Wistia's player metadata.
            error_status_message (str): Message with authentication outcomes for the sake of verbosity.
        """
        token, media_id = kwargs.get('token'), kwargs.get('video_id')  # pylint: disable=unused-variable
        auth_data, error_message = {}, ''
        auth_data['token'] = token
        url = self.captions_api.get('auth_sample_url').format(token=str(token))
        response = requests.get('https://' + url)
        if response.status_code == httplib.UNAUTHORIZED:
            error_message = "Authentication failed. " \
                            "Please ensure you have provided a valid master token, using Video API Token field."
        return auth_data, error_message
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cboxGetOpenFiles():
  '''Returns a list of all currently opened files, for operations purposes only.
  This call is protected by the same shared secret as the /wopi/cbox/open call.'''
  req = flask.request
  # first check if the shared secret matches ours
  if 'Authorization' not in req.headers or req.headers['Authorization'] != 'Bearer ' + Wopi.ocsecret:
    Wopi.log.warning('msg="cboxGetOpenFiles: unauthorized access attempt, missing authorization token" client="%s"' % req.remote_addr)
    return 'Client not authorized', httplib.UNAUTHORIZED
  # first convert the sets into lists, otherwise sets cannot be serialized in JSON format
  jl = {}
  for f in Wopi.openfiles.keys():
    jl[f] = (Wopi.openfiles[f][0], tuple(Wopi.openfiles[f][1]))
  # dump the current list of opened files in JSON format
  Wopi.log.info('msg="cboxGetOpenFiles: returning list of open files" client="%s"' % req.remote_addr)
  return flask.Response(json.dumps(jl), mimetype='application/json')


#
# The WOPI protocol implementation starts here
#
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def wopiGetFile(fileid):
  '''Implements the GetFile WOPI call'''
  Wopi.refreshconfig()
  try:
    acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
    if acctok['exp'] < time.time():
      raise jwt.exceptions.ExpiredSignatureError
    Wopi.log.info('msg="GetFile" user="%s:%s" filename="%s" fileid="%s" token="%s"' % \
                  (acctok['ruid'], acctok['rgid'], acctok['filename'], fileid, flask.request.args['access_token'][-20:]))
    # stream file from storage to client
    resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
    resp.status_code = httplib.OK
    return resp
  except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
    Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
                     (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
    return 'Invalid access token', httplib.UNAUTHORIZED
  except Exception, e:
    return _logGeneralExceptionAndReturn(e, flask.request)


#
# The following operations are all called on POST /wopi/files/<fileid>
#
twitter.py 文件源码 项目:Chirps 作者: vered1986 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def VerifyCredentials(self):
    '''Returns a twitter.User instance if the authenticating user is valid.

    Returns:
      A twitter.User instance representing that user if the
      credentials are valid, None otherwise.
    '''
    if not self._oauth_consumer:
      raise TwitterError("Api instance must first be given user credentials.")
    url = '%s/account/verify_credentials.json' % self.base_url
    try:
      json = self._FetchUrl(url, no_cache=True)
    except urllib2.HTTPError, http_error:
      if http_error.code == httplib.UNAUTHORIZED:
        return None
      else:
        raise http_error
    data = self._ParseAndCheckTwitter(json)
    return User.NewFromJsonDict(data)
jwt.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def abort_unauthorized(description):
    """Raise an Unauthorized exception.
    """
    abort(httplib.UNAUTHORIZED, description=description)
wsgi.py 文件源码 项目:endpoints-management-python 作者: cloudendpoints 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _handle_missing_api_key(self, app_info, start_response):
        code = httplib.UNAUTHORIZED
        detail = self._NO_API_KEY_MSG
        logger.warn(u'Check not performed %d, %s', code, detail)
        error_msg = b'%d %s' % (code, detail.encode('utf-8'))
        start_response(error_msg, [])
        app_info.response_code = code
        app_info.api_key_valid = False
        return error_msg  # the request cannot continue
test_ApiCalls.py 文件源码 项目:irida-miseq-uploader 作者: phac-nml 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_validate_URL_existence_url_raise_err(self, mock_cs, mock_url):

        url_raise_err = Foo()
        err_msg = "Unauthorized"
        setattr(url_raise_err, "code", httplib.UNAUTHORIZED)
        setattr(url_raise_err, "msg", err_msg)

        mock_url.side_effect = [url_raise_err]

        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls(
            client_id="",
            client_secret="",
            base_URL="",
            username="",
            password=""
        )

        validate_URL = api.validate_URL_existence

        url = "http://localhost:8080/api/"

        with self.assertRaises(Exception) as err:
            validate_URL(url)

        self.assertTrue(err_msg in str(err.exception))
        API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_with_incorrect_username_and_domain_id(self):
        data = generate_password_auth_data({
            'name': uuid.uuid4().hex,
            'password': self.password,
            'domain': {'id': self.domain_id},
        })
        self.authenticate(data, expected_status=httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_with_username_and_incorrect_domain_id(self):
        data = generate_password_auth_data({
            'name': self.username,
            'password': self.password,
            'domain': {'id': uuid.uuid4().hex},
        })
        self.authenticate(data, expected_status=httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_with_username_and_incorrect_domain_name(self):
        data = generate_password_auth_data({
            'name': self.username,
            'password': self.password,
            'domain': {'name': uuid.uuid4().hex},
        })
        self.authenticate(data, expected_status=httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_with_incorrect_user_id(self):
        data = generate_password_auth_data({
            'id': uuid.uuid4().hex,
            'password': self.password,
        })
        self.authenticate(data, expected_status=httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_with_user_id_and_incorrect_domain_id(self):
        data = generate_password_auth_data({
            'id': self.user_id,
            'password': self.password,
            'domain': {'id': uuid.uuid4().hex},
        })
        self.authenticate(data, expected_status=httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_invalid(self):
        data = generate_password_auth_data_with_scope(
            user={
                'name': self.username,
                'password': self.password,
                'domain': {'id': self.domain_id},
            },
            scope={'project': {'id': uuid.uuid4().hex}})
        self.authenticate(data, expected_status=httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_invalid(self):
        data = generate_password_auth_data_with_scope(
            user={
                'name': self.username,
                'password': self.password,
                'domain': {'id': self.domain_id},
            },
            scope={'domain': {'id': uuid.uuid4().hex}})
        self.authenticate(data, httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_with_invalid_project_id(self):
        data = generate_password_auth_data({
            'name': self.username,
            'password': self.password,
        })
        resp = self.authenticate(data)
        token = resp.headers['X-Subject-Token']

        data = generate_token_auth_data_with_scope(
            token_id=token,
            scope={'project': {'id': 'invalid'}})
        self.authenticate(data, httplib.UNAUTHORIZED)
test_integration.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_with_invalid_token_id(self):
        data = generate_token_auth_data_with_scope(
            token_id='invalid',
            scope={'project': {'id': self.project_id}})
        self.authenticate(data, httplib.UNAUTHORIZED)
files.py 文件源码 项目:orangecloud-client 作者: antechrestos 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _is_token_expired_on_upload(response):
        if response.status_code == httplib.UNAUTHORIZED:
            try:
                json_data = response.json()
                error = json_data.get('error')
                # {"error":{"code":"PDK_RP_0004","label":"INVALID_TOKEN","details":"OIDC rejected the token"}}
                return error is not None and error.get('label') == 'INVALID_TOKEN'
            except:
                return False
        else:
            return False
api.py 文件源码 项目:orangecloud-client 作者: antechrestos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _is_token_expired(response):
        if response.status_code == httplib.UNAUTHORIZED:
            try:
                json_data = response.json()
                return json_data.get('message', '') == 'Invalid credentials'
            except:
                return False
        else:
            return False
brightcove.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def post(self, url, payload, headers=None, can_retry=True):
        """
        Issue REST POST request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            payload (dict): POST data.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests.
            can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
        Returns:
            Response in Python native data format.
        """
        headers_ = {
            'Authorization': 'Bearer ' + self.access_token,
            'Content-type': 'application/json'
        }
        if headers is not None:
            headers_.update(headers)

        resp = requests.post(url, data=payload, headers=headers_)
        if resp.status_code in (httplib.OK, httplib.CREATED):
            return resp.json()
        elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
            self.access_token = self._refresh_access_token()
            return self.post(url, payload, headers, can_retry=False)
        else:
            raise BrightcoveApiClientError
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cboxDownload():
  '''Returns the file's content for a given valid access token. Used as a download URL,
     so that the file's path is never explicitly visible.'''
  try:
    acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
    if acctok['exp'] < time.time():
      raise jwt.exceptions.ExpiredSignatureError
    resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
    resp.headers['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(acctok['filename'])
    resp.status_code = httplib.OK
    Wopi.log.info('msg="cboxDownload: direct download succeeded" filename="%s" user="%s:%s" token="%s"' % \
                  (acctok['filename'], acctok['ruid'], acctok['rgid'], flask.request.args['access_token'][-20:]))
    return resp
  except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
    Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
                     (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
    return 'Invalid access token', httplib.NOT_FOUND
  except IOError, e:
    Wopi.log.info('msg="Requested file not found" filename="%s" token="%s" error="%s"' % \
                  (acctok['filename'], flask.request.args['access_token'][-20:], e))
    return 'File not found', httplib.NOT_FOUND
  except KeyError, e:
    Wopi.log.error('msg="Invalid access token or request argument" error="%s"' % e)
    return 'Invalid access token', httplib.UNAUTHORIZED
  except Exception, e:
    return _logGeneralExceptionAndReturn(e, flask.request)
requests_test.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_unauthenticated(self):
    response, _ = self.client.request(
        'Participant',
        method='POST',
        body='{}',
        authenticated=False,
        check_status=False)
    self.assertEquals(response.status, httplib.UNAUTHORIZED)
glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_resp_status_and_retry(resp, image_id, url):
    # Note(Jesse): This branch sorts errors into those that are permanent,
    # those that are ephemeral, and those that are unexpected.
    if resp.status in (httplib.BAD_REQUEST,                      # 400
                       httplib.UNAUTHORIZED,                     # 401
                       httplib.PAYMENT_REQUIRED,                 # 402
                       httplib.FORBIDDEN,                        # 403
                       httplib.METHOD_NOT_ALLOWED,               # 405
                       httplib.NOT_ACCEPTABLE,                   # 406
                       httplib.PROXY_AUTHENTICATION_REQUIRED,    # 407
                       httplib.CONFLICT,                         # 409
                       httplib.GONE,                             # 410
                       httplib.LENGTH_REQUIRED,                  # 411
                       httplib.PRECONDITION_FAILED,              # 412
                       httplib.REQUEST_ENTITY_TOO_LARGE,         # 413
                       httplib.REQUEST_URI_TOO_LONG,             # 414
                       httplib.UNSUPPORTED_MEDIA_TYPE,           # 415
                       httplib.REQUESTED_RANGE_NOT_SATISFIABLE,  # 416
                       httplib.EXPECTATION_FAILED,               # 417
                       httplib.UNPROCESSABLE_ENTITY,             # 422
                       httplib.LOCKED,                           # 423
                       httplib.FAILED_DEPENDENCY,                # 424
                       httplib.UPGRADE_REQUIRED,                 # 426
                       httplib.NOT_IMPLEMENTED,                  # 501
                       httplib.HTTP_VERSION_NOT_SUPPORTED,       # 505
                       httplib.NOT_EXTENDED,                     # 510
                       ):
        raise PluginError("Got Permanent Error response [%i] while "
                          "uploading image [%s] to glance [%s]"
                          % (resp.status, image_id, url))
    # Nova service would process the exception
    elif resp.status == httplib.NOT_FOUND:                        # 404
        exc = XenAPI.Failure('ImageNotFound')
        raise exc
    # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We
    # optimistically retry on 500 errors below.
    elif resp.status in (httplib.REQUEST_TIMEOUT,                # 408
                         httplib.INTERNAL_SERVER_ERROR,          # 500
                         httplib.BAD_GATEWAY,                    # 502
                         httplib.SERVICE_UNAVAILABLE,            # 503
                         httplib.GATEWAY_TIMEOUT,                # 504
                         httplib.INSUFFICIENT_STORAGE,           # 507
                         ):
        raise RetryableError("Got Ephemeral Error response [%i] while "
                             "uploading image [%s] to glance [%s]"
                             % (resp.status, image_id, url))
    else:
        # Note(Jesse): Assume unexpected errors are retryable. If you are
        # seeing this error message, the error should probably be added
        # to either the ephemeral or permanent error list.
        raise RetryableError("Got Unexpected Error response [%i] while "
                             "uploading image [%s] to glance [%s]"
                             % (resp.status, image_id, url))
docker_http_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _Ping(self):
    """Ping the v2 Registry.

    Only called during transport construction, this pings the listed
    v2 registry.  The point of this ping is to establish the "realm"
    and "service" to use for Basic for Bearer-Token exchanges.
    """
    # This initiates the pull by issuing a v2 ping:
    #   GET H:P/v2/
    headers = {
        'content-type': 'application/json',
        'user-agent': docker_name.USER_AGENT,
    }
    resp, unused_content = self._transport.request(
        '{scheme}://{registry}/v2/'.format(scheme=Scheme(self._name.registry),
                                           registry=self._name.registry),
        'GET',
        body=None,
        headers=headers)

    # We expect a www-authenticate challenge.
    _CheckState(resp.status in [httplib.OK, httplib.UNAUTHORIZED],
                'Unexpected status: %d' % resp.status)

    # The registry is authenticated iff we have an authentication challenge.
    if resp.status == httplib.OK:
      self._authentication = _ANONYMOUS
      self._service = 'none'
      self._realm = 'none'
      return

    challenge = resp['www-authenticate']
    _CheckState(' ' in challenge,
                'Unexpected "www-authenticate" header form: %s' % challenge)

    (self._authentication, remainder) = challenge.split(' ', 1)

    # Normalize the authentication scheme to have exactly the first letter
    # capitalized. Scheme matching is required to be case insensitive:
    # https://tools.ietf.org/html/rfc7235#section-2.1
    self._authentication = self._authentication.capitalize()

    _CheckState(self._authentication in [_BASIC, _BEARER],
                'Unexpected "www-authenticate" challenge type: %s'
                % self._authentication)

    # Default "_service" to the registry
    self._service = self._name.registry

    tokens = remainder.split(',')
    for t in tokens:
      if t.startswith(_REALM_PFX):
        self._realm = t[len(_REALM_PFX):].strip('"')
      elif t.startswith(_SERVICE_PFX):
        self._service = t[len(_SERVICE_PFX):].strip('"')

    # Make sure these got set.
    _CheckState(self._realm, 'Expected a "%s" in "www-authenticate" '
                'header: %s' % (_REALM_PFX, challenge))
docker_http_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _Ping(self):
    """Ping the v2 Registry.

    Only called during transport construction, this pings the listed
    v2 registry.  The point of this ping is to establish the "realm"
    and "service" to use for Basic for Bearer-Token exchanges.
    """
    # This initiates the pull by issuing a v2 ping:
    #   GET H:P/v2/
    headers = {
        'content-type': 'application/json',
        'user-agent': docker_name.USER_AGENT,
    }
    resp, unused_content = self._transport.request(
        '{scheme}://{registry}/v2/'.format(scheme=Scheme(self._name.registry),
                                           registry=self._name.registry),
        'GET',
        body=None,
        headers=headers)

    # We expect a www-authenticate challenge.
    _CheckState(resp.status in [httplib.OK, httplib.UNAUTHORIZED],
                'Unexpected status: %d' % resp.status)

    # The registry is authenticated iff we have an authentication challenge.
    if resp.status == httplib.OK:
      self._authentication = _ANONYMOUS
      self._service = 'none'
      self._realm = 'none'
      return

    challenge = resp['www-authenticate']
    _CheckState(' ' in challenge,
                'Unexpected "www-authenticate" header form: %s' % challenge)

    (self._authentication, remainder) = challenge.split(' ', 1)

    # Normalize the authentication scheme to have exactly the first letter
    # capitalized. Scheme matching is required to be case insensitive:
    # https://tools.ietf.org/html/rfc7235#section-2.1
    self._authentication = self._authentication.capitalize()

    _CheckState(self._authentication in [_BASIC, _BEARER],
                'Unexpected "www-authenticate" challenge type: %s'
                % self._authentication)

    # Default "_service" to the registry
    self._service = self._name.registry

    tokens = remainder.split(',')
    for t in tokens:
      if t.startswith(_REALM_PFX):
        self._realm = t[len(_REALM_PFX):].strip('"')
      elif t.startswith(_SERVICE_PFX):
        self._service = t[len(_SERVICE_PFX):].strip('"')

    # Make sure these got set.
    _CheckState(self._realm, 'Expected a "%s" in "www-authenticate" '
                'header: %s' % (_REALM_PFX, challenge))
errors.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_status(status, expected, path, headers=None,
                 resp_headers=None, body=None, extras=None):
  """Check HTTP response status is expected.

  Args:
    status: HTTP response status. int.
    expected: a list of expected statuses. A list of ints.
    path: filename or a path prefix.
    headers: HTTP request headers.
    resp_headers: HTTP response headers.
    body: HTTP response body.
    extras: extra info to be logged verbatim if error occurs.

  Raises:
    AuthorizationError: if authorization failed.
    NotFoundError: if an object that's expected to exist doesn't.
    TimeoutError: if HTTP request timed out.
    ServerError: if server experienced some errors.
    FatalError: if any other unexpected errors occurred.
  """
  if status in expected:
    return

  msg = ('Expect status %r from Google Storage. But got status %d.\n'
         'Path: %r.\n'
         'Request headers: %r.\n'
         'Response headers: %r.\n'
         'Body: %r.\n'
         'Extra info: %r.\n' %
         (expected, status, path, headers, resp_headers, body, extras))

  if status == httplib.UNAUTHORIZED:
    raise AuthorizationError(msg)
  elif status == httplib.FORBIDDEN:
    raise ForbiddenError(msg)
  elif status == httplib.NOT_FOUND:
    raise NotFoundError(msg)
  elif status == httplib.REQUEST_TIMEOUT:
    raise TimeoutError(msg)
  elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE:
    raise InvalidRange(msg)
  elif (status == httplib.OK and 308 in expected and
        httplib.OK not in expected):
    raise FileClosedError(msg)
  elif status >= 500:
    raise ServerError(msg)
  else:
    raise FatalError(msg)
gerrit_util.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ReadHttpResponse(conn, expect_status=200, ignore_404=True):
  """Reads an http response from a connection into a string buffer.

  Args:
    conn: An HTTPSConnection or HTTPConnection created by CreateHttpConn, above.
    expect_status: Success is indicated by this status in the response.
    ignore_404: For many requests, gerrit-on-borg will return 404 if the request
                doesn't match the database contents.  In most such cases, we
                want the API to return None rather than raise an Exception.
  Returns: A string buffer containing the connection's reply.
  """

  sleep_time = 0.5
  for idx in range(TRY_LIMIT):
    response = conn.getresponse()

    # Check if this is an authentication issue.
    www_authenticate = response.getheader('www-authenticate')
    if (response.status in (httplib.UNAUTHORIZED, httplib.FOUND) and
        www_authenticate):
      auth_match = re.search('realm="([^"]+)"', www_authenticate, re.I)
      host = auth_match.group(1) if auth_match else conn.req_host
      reason = ('Authentication failed. Please make sure your .netrc file '
                'has credentials for %s' % host)
      raise GerritAuthenticationError(response.status, reason)

    # If response.status < 500 then the result is final; break retry loop.
    if response.status < 500:
      break
    # A status >=500 is assumed to be a possible transient error; retry.
    http_version = 'HTTP/%s' % ('1.1' if response.version == 11 else '1.0')
    msg = (
        'A transient error occurred while querying %s:\n'
        '%s %s %s\n'
        '%s %d %s' % (
            conn.host, conn.req_params['method'], conn.req_params['url'],
            http_version, http_version, response.status, response.reason))
    if TRY_LIMIT - idx > 1:
      msg += '\n... will retry %d more times.' % (TRY_LIMIT - idx - 1)
      time.sleep(sleep_time)
      sleep_time = sleep_time * 2
      req_host = conn.req_host
      req_params = conn.req_params
      conn = GetConnectionClass()(req_host)
      conn.req_host = req_host
      conn.req_params = req_params
      conn.request(**req_params)
    LOGGER.warn(msg)
  if ignore_404 and response.status == 404:
    return StringIO()
  if response.status != expect_status:
    reason = '%s: %s' % (response.reason, response.read())
    raise GerritError(response.status, reason)
  return StringIO(response.read())


问题


面经


文章

微信
公众号

扫码关注公众号