python类NOT_FOUND的实例源码

test_ApiCalls.py 文件源码 项目:irida-miseq-uploader 作者: phac-nml 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_validate_URL_existence_url_not_found(self, mock_cs, mock_url):

        url_not_found = Foo()
        setattr(url_not_found, "code", httplib.NOT_FOUND)

        mock_url.side_effect = [url_not_found]
        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls("", "", "", "", "")
        validate_URL = api.validate_URL_existence

        url = "notAWebSite"
        valid = False

        is_valid = validate_URL(url)
        self.assertEqual(is_valid, valid)
        API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
bos_client.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def does_bucket_exist(self, bucket_name, config=None):
        """
        Check whether there is a bucket with specific name

        :param bucket_name: None
        :type bucket_name: str
        :return:True or False
        :rtype: bool
        """
        try:
            self._send_request(http_methods.HEAD, bucket_name, config=config)
            return True
        except BceHttpClientError as e:
            if isinstance(e.last_error, BceServerError):
                if e.last_error.status_code == httplib.FORBIDDEN:
                    return True
                if e.last_error.status_code == httplib.NOT_FOUND:
                    return False
            raise e
bos_client.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def does_bucket_exist(self, bucket_name, config=None):
        """
        Check whether there is a bucket with specific name

        :param bucket_name: None
        :type bucket_name: str
        :return:True or False
        :rtype: bool
        """
        try:
            self._send_request(http_methods.HEAD, bucket_name, config=config)
            return True
        except BceHttpClientError as e:
            if isinstance(e.last_error, BceServerError):
                if e.last_error.status_code == httplib.FORBIDDEN:
                    return True
                if e.last_error.status_code == httplib.NOT_FOUND:
                    return False
            raise e
common.py 文件源码 项目:qtip 作者: opnfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_endpoint_for_error(resource, operation=None):
    def _decorator(func):
        def _execute(name=None):
            try:
                return func(name), httplib.OK
            except error.NotFoundError:
                return connexion.problem(
                    httplib.NOT_FOUND,
                    '{} not found'.format(resource),
                    'Requested {} `{}` not found.'
                    .format(resource.lower(), name))
            except error.ToBeDoneError:
                return connexion.problem(
                    httplib.NOT_IMPLEMENTED,
                    '{} handler not implemented'.format(operation),
                    'Requested operation `{}` on {} not implemented.'
                    .format(operation.lower(), resource.lower()))
        return _execute
    return _decorator
stub_dispatcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _copy(gcs_stub, filename, headers):
  """Copy file.

  Args:
    gcs_stub: an instance of gcs stub.
    filename: dst filename of format /bucket/filename
    headers: a dict of request headers. Must contain _XGoogCopySource header.

  Returns:
    An _FakeUrlFetchResult instance.
  """
  source = _XGoogCopySource(headers).value
  result = _handle_head(gcs_stub, source)
  if result.status_code == httplib.NOT_FOUND:
    return result
  directive = headers.pop('x-goog-metadata-directive', 'COPY')
  if directive == 'REPLACE':
    gcs_stub.put_copy(source, filename, headers)
  else:
    gcs_stub.put_copy(source, filename, None)
  return _FakeUrlFetchResult(httplib.OK, {}, '')
stub_dispatcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _handle_get(gcs_stub, filename, param_dict, headers):
  """Handle GET object and GET bucket."""
  mo = re.match(BUCKET_ONLY_PATH, filename)
  if mo is not None:

    return _handle_get_bucket(gcs_stub, mo.group(1), param_dict)
  else:

    result = _handle_head(gcs_stub, filename)
    if result.status_code == httplib.NOT_FOUND:
      return result



    start, end = _Range(headers).value
    st_size = result.headers['x-goog-stored-content-length']
    if end is not None:
      result.status_code = httplib.PARTIAL_CONTENT
      end = min(end, st_size - 1)
      result.headers['content-range'] = 'bytes %d-%d/%d' % (start, end, st_size)

    result.content = gcs_stub.get_object(filename, start, end)
    result.headers['content-length'] = len(result.content)
    return result
stub_dispatcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _handle_head(gcs_stub, filename):
  """Handle HEAD request."""
  filestat = gcs_stub.head_object(filename)
  if not filestat:
    return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')

  http_time = common.posix_time_to_http(filestat.st_ctime)

  response_headers = {
      'x-goog-stored-content-length': filestat.st_size,
      'content-length': 0,
      'content-type': filestat.content_type,
      'etag': filestat.etag,
      'last-modified': http_time
  }

  if filestat.metadata:
    response_headers.update(filestat.metadata)

  return _FakeUrlFetchResult(httplib.OK, response_headers, '')
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_check_resp_status_and_retry_image_not_found(self):
        mock_resp_badgateway = mock.Mock()
        mock_resp_badgateway.status = httplib.NOT_FOUND
        self.glance.XenAPI.Failure = FakeXenAPIException
        self.assertRaises(
            self.glance.XenAPI.Failure,
            self.glance.check_resp_status_and_retry,
            mock_resp_badgateway,
            'fake_image_id',
            'fake_url')
utils.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tenant_not_found(message):
    status_code = httplib.NOT_FOUND
    response = jsonify({"error": message, "status_code": status_code})
    response.status_code = status_code
    return response
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _blob_exists(self, digest):
    """Check the remote for the given layer."""
    # HEAD the blob, and check for a 200
    resp, unused_content = self._transport.Request(
        '{base_url}/blobs/{digest}'.format(
            base_url=self._base_url(),
            digest=digest),
        method='HEAD', accepted_codes=[httplib.OK, httplib.NOT_FOUND])

    return resp.status == httplib.OK  # pytype: disable=attribute-error
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _manifest_exists(self, image):
    """Check the remote for the given manifest by digest."""
    # GET the manifest by digest, and check for 200
    resp, unused_content = self._transport.Request(
        '{base_url}/manifests/{digest}'.format(
            base_url=self._base_url(), digest=image.digest()),
        method='GET',
        accepted_codes=[httplib.OK, httplib.NOT_FOUND],
        accepted_mimes=[image.media_type()])

    return resp.status == httplib.OK  # pytype: disable=attribute-error
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _remote_tag_digest(self):
    """Check the remote for the given manifest by digest."""

    # GET the tag we're pushing
    resp, unused_content = self._transport.Request(
        '{base_url}/manifests/{tag}'.format(
            base_url=self._base_url(),
            tag=self._name.tag),  # pytype: disable=attribute-error
        method='GET', accepted_codes=[httplib.OK, httplib.NOT_FOUND])

    if resp.status == httplib.NOT_FOUND:  # pytype: disable=attribute-error
      return None

    return resp.get('docker-content-digest')
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def exists(self):
    try:
      manifest = json.loads(self.manifest(validate=False))
      return (manifest['schemaVersion'] == 2 and
              'layers' in manifest and
              self.media_type() in self._accepted_mimes)
    except docker_http.V2DiagnosticException as err:
      if err.status == httplib.NOT_FOUND:
        return False
      raise
docker_image_list_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def exists(self):
    try:
      manifest = json.loads(self.manifest(validate=False))
      return manifest['schemaVersion'] == 2 and 'manifests' in manifest
    except docker_http.V2DiagnosticException as err:
      if err.status == httplib.NOT_FOUND:
        return False
      raise
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _blob_exists(self, digest):
    """Check the remote for the given layer."""
    # HEAD the blob, and check for a 200
    resp, unused_content = self._transport.Request(
        '{base_url}/blobs/{digest}'.format(
            base_url=self._base_url(),
            digest=digest),
        method='HEAD', accepted_codes=[httplib.OK, httplib.NOT_FOUND])

    return resp.status == httplib.OK  # pytype: disable=attribute-error
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _manifest_exists(self, image):
    """Check the remote for the given manifest by digest."""
    # GET the manifest by digest, and check for 200
    resp, unused_content = self._transport.Request(
        '{base_url}/manifests/{digest}'.format(
            base_url=self._base_url(),
            digest=image.digest()),
        method='GET', accepted_codes=[httplib.OK, httplib.NOT_FOUND])

    return resp.status == httplib.OK  # pytype: disable=attribute-error
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _remote_tag_digest(self):
    """Check the remote for the given manifest by digest."""

    # GET the tag we're pushing
    resp, unused_content = self._transport.Request(
        '{base_url}/manifests/{tag}'.format(
            base_url=self._base_url(),
            tag=self._name.tag),  # pytype: disable=attribute-error
        method='GET', accepted_codes=[httplib.OK, httplib.NOT_FOUND])

    if resp.status == httplib.NOT_FOUND:  # pytype: disable=attribute-error
      return None

    return resp.get('docker-content-digest')
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def exists(self):
    try:
      self.manifest(validate=False)
      return True
    except docker_http.V2DiagnosticException as err:
      if err.status == httplib.NOT_FOUND:
        return False
      raise
apiCalls.py 文件源码 项目:irida-miseq-uploader 作者: phac-nml 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_URL_existence(self, url, use_session=False):
        """
        tries to validate existence of given url by trying to open it.
        true if HTTP OK, false if HTTP NOT FOUND otherwise
            raises error containing error code and message

        arguments:
            url -- the url link to open and validate
            use_session -- if True then this uses self.session.get(url) instead
            of urlopen(url) to get response

        returns
            true if http response OK 200
            false if http response NOT FOUND 404
        """

        if use_session:
            response = self.session.get(url)

            if response.status_code == httplib.OK:
                return True
            elif response.status_code == httplib.NOT_FOUND:
                return False
            else:
                raise Exception(
                    str(response.status_code) + " " + response.reason)

        else:
            response = urlopen(url, timeout=self.max_wait_time)

            if response.code == httplib.OK:
                return True
            elif response.code == httplib.NOT_FOUND:
                return False
            else:
                raise Exception(str(response.code) + " " + response.msg)
zendesk_localization.py 文件源码 项目:zendesk-help-center-localization 作者: mykola-mokhnach 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def list_article_attachments(self, article_id):
        try:
            return self._get('articles/{}/attachments.json'.format(article_id)).get('article_attachments', [])
        except APIError as e:
            if e.error_code == httplib.NOT_FOUND:
                return []
            raise e
qpi_controller_test.py 文件源码 项目:qtip 作者: opnfv 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_qpi_not_found(app_client):
    response_not_found = app_client.get("/v1.0/qpis/fake.yaml")
    response_data = json.loads(response_not_found.data)
    assert response_not_found.status_code == httplib.NOT_FOUND
    assert response_data['title'] == "QPI not found"
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def cboxDownload():
  '''Returns the file's content for a given valid access token. Used as a download URL,
     so that the file's path is never explicitly visible.'''
  try:
    acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
    if acctok['exp'] < time.time():
      raise jwt.exceptions.ExpiredSignatureError
    resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
    resp.headers['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(acctok['filename'])
    resp.status_code = httplib.OK
    Wopi.log.info('msg="cboxDownload: direct download succeeded" filename="%s" user="%s:%s" token="%s"' % \
                  (acctok['filename'], acctok['ruid'], acctok['rgid'], flask.request.args['access_token'][-20:]))
    return resp
  except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
    Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
                     (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
    return 'Invalid access token', httplib.NOT_FOUND
  except IOError, e:
    Wopi.log.info('msg="Requested file not found" filename="%s" token="%s" error="%s"' % \
                  (acctok['filename'], flask.request.args['access_token'][-20:], e))
    return 'File not found', httplib.NOT_FOUND
  except KeyError, e:
    Wopi.log.error('msg="Invalid access token or request argument" error="%s"' % e)
    return 'Invalid access token', httplib.UNAUTHORIZED
  except Exception, e:
    return _logGeneralExceptionAndReturn(e, flask.request)
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def wopiFilesPost(fileid):
  '''A dispatcher metod for all POST operations on files'''
  Wopi.refreshconfig()
  try:
    acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
    if acctok['exp'] < time.time():
      raise jwt.exceptions.ExpiredSignatureError
    headers = flask.request.headers
    op = headers['X-WOPI-Override']       # must be one of the following strings, throws KeyError if missing
    if op in ('LOCK', 'REFRESH_LOCK'):
      return wopiLock(fileid, headers, acctok)
    elif op == 'UNLOCK':
      return wopiUnlock(fileid, headers, acctok)
    elif op == 'GET_LOCK':
      return wopiGetLock(fileid, headers, acctok)
    elif op == 'PUT_RELATIVE':
      return wopiPutRelative(fileid, headers, acctok)
    elif op == 'DELETE':
      return wopiDeleteFile(fileid, headers, acctok)
    elif op == 'RENAME_FILE':
      return wopiRenameFile(fileid, headers, acctok)
    #elif op == 'PUT_USER_INFO':   https://wopirest.readthedocs.io/en/latest/files/PutUserInfo.html
    else:
      Wopi.log.warning('msg="Unknown/unsupported operation" operation="%s"' % op)
      return 'Not supported operation found in header', httplib.NOT_IMPLEMENTED
  except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
    Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
                     (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
    return 'Invalid access token', httplib.NOT_FOUND
  except Exception, e:
    return _logGeneralExceptionAndReturn(e, flask.request)
stub_dispatcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _handle_delete(gcs_stub, filename):
  """Handle DELETE object."""
  if gcs_stub.delete_object(filename):
    return _FakeUrlFetchResult(httplib.NO_CONTENT, {}, '')
  else:
    return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')
static_files_handler.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _not_found_404(environ, start_response):
    status = httplib.NOT_FOUND
    start_response('%d %s' % (status, httplib.responses[status]),
                   [('Content-Type', 'text/plain')])
    return ['%s not found' % environ['PATH_INFO']]
es_drop.py 文件源码 项目:kafka-pyspark-elasticsearch-integration-test 作者: WuyangLI 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def drop_index(cnx, config):
    """Delete index

    Parameters:
      cnx:    HTTP connection to Elastic Search
      config: Some settings (including the name of the index)

    """

    sys.stderr.write( "Deleting Elastic Search index %s\n" % config.index)

    ####################################
    cnx.request("DELETE",config.index) #
    ####################################

    resp=cnx.getresponse()
    sys.stderr.write( resp.read()+"\n")
    if resp.status == httplib.NOT_FOUND:
        sys.stderr.write( "  WARNING: Index %s does not exist - cannot delete\n" % config.index)
    elif resp.status != httplib.OK:
        raise Exception("  ERROR when deleting " + config.index + ": %d %s" % (resp.status, resp.reason))
    else:
        sys.stderr.write( "  Index deleted\n")


########
# MAIN #    
########
metrics_test.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_metrics(self):
    request = {
        'start_date': '2017-01-21',
        'end_date': '2017-01-22'
    }
    try:
      response = self.client.request_json('Metrics', 'POST', request)
      pprint.pprint(response)
    except HttpException as ex:
      if ex.code == httplib.NOT_FOUND:
        print "No metrics loaded"
      else:
        raise
participant_summary_api_test.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testQuery_noParticipants(self):
    self.send_get('Participant/P1/Summary', expected_status=httplib.NOT_FOUND)
    response = self.send_get('ParticipantSummary')
    self.assertBundle([], response)
participant_summary_api_test.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testQuery_noSummaries(self):
    participant = self.send_post('Participant', {"providerLink": [self.provider_link]})
    participant_id = participant['participantId']
    self.send_get('Participant/%s/Summary' % participant_id, expected_status=httplib.NOT_FOUND)
    response = self.send_get('ParticipantSummary')
    self.assertBundle([], response)
install_config.py 文件源码 项目:raw-data-repository 作者: all-of-us 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main(args):
  client = Client(parse_cli=False, creds_file=args.creds_file, default_instance=args.instance)
  config_path = 'Config/%s' % args.key if args.key else 'Config'
  try:
    config_server = client.request_json(config_path, 'GET')
    formatted_server_config = _json_to_sorted_string(config_server)
  except HttpException as e:
    if e.code == httplib.NOT_FOUND:
      formatted_server_config = ''
    else:
      raise

  if not args.config:
    logging.info('----------------- Current Server Config --------------------')
    _log_and_write_config_lines(formatted_server_config.split('\n'), args.config_output)
  else:
    with open(args.config) as config_file:
      config_file = json.load(config_file)
    if not args.key or args.key == 'current_config':
      with open(BASE_CONFIG_FILE) as base_config_file:
        combined_config = json.load(base_config_file)
      combined_config.update(config_file)
    else:
      combined_config = config_file
    comparable_file = _json_to_sorted_string(combined_config)
    configs_match = _compare_configs(comparable_file, formatted_server_config, args.config_output)

    if not configs_match and args.update:
      logging.info('-------------- Updating Server -------------------')
      method = 'POST' if args.key else 'PUT'
      client.request_json(config_path, method, combined_config)


问题


面经


文章

微信
公众号

扫码关注公众号