def test_validate_URL_existence_url_not_found(self, mock_cs, mock_url):
url_not_found = Foo()
setattr(url_not_found, "code", httplib.NOT_FOUND)
mock_url.side_effect = [url_not_found]
mock_cs.side_effect = [None]
api = API.apiCalls.ApiCalls("", "", "", "", "")
validate_URL = api.validate_URL_existence
url = "notAWebSite"
valid = False
is_valid = validate_URL(url)
self.assertEqual(is_valid, valid)
API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
python类NOT_FOUND的实例源码
def does_bucket_exist(self, bucket_name, config=None):
"""
Check whether there is a bucket with specific name
:param bucket_name: None
:type bucket_name: str
:return:True or False
:rtype: bool
"""
try:
self._send_request(http_methods.HEAD, bucket_name, config=config)
return True
except BceHttpClientError as e:
if isinstance(e.last_error, BceServerError):
if e.last_error.status_code == httplib.FORBIDDEN:
return True
if e.last_error.status_code == httplib.NOT_FOUND:
return False
raise e
def does_bucket_exist(self, bucket_name, config=None):
"""
Check whether there is a bucket with specific name
:param bucket_name: None
:type bucket_name: str
:return:True or False
:rtype: bool
"""
try:
self._send_request(http_methods.HEAD, bucket_name, config=config)
return True
except BceHttpClientError as e:
if isinstance(e.last_error, BceServerError):
if e.last_error.status_code == httplib.FORBIDDEN:
return True
if e.last_error.status_code == httplib.NOT_FOUND:
return False
raise e
def check_endpoint_for_error(resource, operation=None):
def _decorator(func):
def _execute(name=None):
try:
return func(name), httplib.OK
except error.NotFoundError:
return connexion.problem(
httplib.NOT_FOUND,
'{} not found'.format(resource),
'Requested {} `{}` not found.'
.format(resource.lower(), name))
except error.ToBeDoneError:
return connexion.problem(
httplib.NOT_IMPLEMENTED,
'{} handler not implemented'.format(operation),
'Requested operation `{}` on {} not implemented.'
.format(operation.lower(), resource.lower()))
return _execute
return _decorator
def _copy(gcs_stub, filename, headers):
"""Copy file.
Args:
gcs_stub: an instance of gcs stub.
filename: dst filename of format /bucket/filename
headers: a dict of request headers. Must contain _XGoogCopySource header.
Returns:
An _FakeUrlFetchResult instance.
"""
source = _XGoogCopySource(headers).value
result = _handle_head(gcs_stub, source)
if result.status_code == httplib.NOT_FOUND:
return result
directive = headers.pop('x-goog-metadata-directive', 'COPY')
if directive == 'REPLACE':
gcs_stub.put_copy(source, filename, headers)
else:
gcs_stub.put_copy(source, filename, None)
return _FakeUrlFetchResult(httplib.OK, {}, '')
def _handle_get(gcs_stub, filename, param_dict, headers):
"""Handle GET object and GET bucket."""
mo = re.match(BUCKET_ONLY_PATH, filename)
if mo is not None:
return _handle_get_bucket(gcs_stub, mo.group(1), param_dict)
else:
result = _handle_head(gcs_stub, filename)
if result.status_code == httplib.NOT_FOUND:
return result
start, end = _Range(headers).value
st_size = result.headers['x-goog-stored-content-length']
if end is not None:
result.status_code = httplib.PARTIAL_CONTENT
end = min(end, st_size - 1)
result.headers['content-range'] = 'bytes %d-%d/%d' % (start, end, st_size)
result.content = gcs_stub.get_object(filename, start, end)
result.headers['content-length'] = len(result.content)
return result
def _handle_head(gcs_stub, filename):
"""Handle HEAD request."""
filestat = gcs_stub.head_object(filename)
if not filestat:
return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')
http_time = common.posix_time_to_http(filestat.st_ctime)
response_headers = {
'x-goog-stored-content-length': filestat.st_size,
'content-length': 0,
'content-type': filestat.content_type,
'etag': filestat.etag,
'last-modified': http_time
}
if filestat.metadata:
response_headers.update(filestat.metadata)
return _FakeUrlFetchResult(httplib.OK, response_headers, '')
def test_check_resp_status_and_retry_image_not_found(self):
mock_resp_badgateway = mock.Mock()
mock_resp_badgateway.status = httplib.NOT_FOUND
self.glance.XenAPI.Failure = FakeXenAPIException
self.assertRaises(
self.glance.XenAPI.Failure,
self.glance.check_resp_status_and_retry,
mock_resp_badgateway,
'fake_image_id',
'fake_url')
def tenant_not_found(message):
status_code = httplib.NOT_FOUND
response = jsonify({"error": message, "status_code": status_code})
response.status_code = status_code
return response
def _blob_exists(self, digest):
"""Check the remote for the given layer."""
# HEAD the blob, and check for a 200
resp, unused_content = self._transport.Request(
'{base_url}/blobs/{digest}'.format(
base_url=self._base_url(),
digest=digest),
method='HEAD', accepted_codes=[httplib.OK, httplib.NOT_FOUND])
return resp.status == httplib.OK # pytype: disable=attribute-error
def _manifest_exists(self, image):
"""Check the remote for the given manifest by digest."""
# GET the manifest by digest, and check for 200
resp, unused_content = self._transport.Request(
'{base_url}/manifests/{digest}'.format(
base_url=self._base_url(), digest=image.digest()),
method='GET',
accepted_codes=[httplib.OK, httplib.NOT_FOUND],
accepted_mimes=[image.media_type()])
return resp.status == httplib.OK # pytype: disable=attribute-error
def _remote_tag_digest(self):
"""Check the remote for the given manifest by digest."""
# GET the tag we're pushing
resp, unused_content = self._transport.Request(
'{base_url}/manifests/{tag}'.format(
base_url=self._base_url(),
tag=self._name.tag), # pytype: disable=attribute-error
method='GET', accepted_codes=[httplib.OK, httplib.NOT_FOUND])
if resp.status == httplib.NOT_FOUND: # pytype: disable=attribute-error
return None
return resp.get('docker-content-digest')
def exists(self):
try:
manifest = json.loads(self.manifest(validate=False))
return (manifest['schemaVersion'] == 2 and
'layers' in manifest and
self.media_type() in self._accepted_mimes)
except docker_http.V2DiagnosticException as err:
if err.status == httplib.NOT_FOUND:
return False
raise
def exists(self):
try:
manifest = json.loads(self.manifest(validate=False))
return manifest['schemaVersion'] == 2 and 'manifests' in manifest
except docker_http.V2DiagnosticException as err:
if err.status == httplib.NOT_FOUND:
return False
raise
def _blob_exists(self, digest):
"""Check the remote for the given layer."""
# HEAD the blob, and check for a 200
resp, unused_content = self._transport.Request(
'{base_url}/blobs/{digest}'.format(
base_url=self._base_url(),
digest=digest),
method='HEAD', accepted_codes=[httplib.OK, httplib.NOT_FOUND])
return resp.status == httplib.OK # pytype: disable=attribute-error
def _manifest_exists(self, image):
"""Check the remote for the given manifest by digest."""
# GET the manifest by digest, and check for 200
resp, unused_content = self._transport.Request(
'{base_url}/manifests/{digest}'.format(
base_url=self._base_url(),
digest=image.digest()),
method='GET', accepted_codes=[httplib.OK, httplib.NOT_FOUND])
return resp.status == httplib.OK # pytype: disable=attribute-error
def _remote_tag_digest(self):
"""Check the remote for the given manifest by digest."""
# GET the tag we're pushing
resp, unused_content = self._transport.Request(
'{base_url}/manifests/{tag}'.format(
base_url=self._base_url(),
tag=self._name.tag), # pytype: disable=attribute-error
method='GET', accepted_codes=[httplib.OK, httplib.NOT_FOUND])
if resp.status == httplib.NOT_FOUND: # pytype: disable=attribute-error
return None
return resp.get('docker-content-digest')
def exists(self):
try:
self.manifest(validate=False)
return True
except docker_http.V2DiagnosticException as err:
if err.status == httplib.NOT_FOUND:
return False
raise
def validate_URL_existence(self, url, use_session=False):
"""
tries to validate existence of given url by trying to open it.
true if HTTP OK, false if HTTP NOT FOUND otherwise
raises error containing error code and message
arguments:
url -- the url link to open and validate
use_session -- if True then this uses self.session.get(url) instead
of urlopen(url) to get response
returns
true if http response OK 200
false if http response NOT FOUND 404
"""
if use_session:
response = self.session.get(url)
if response.status_code == httplib.OK:
return True
elif response.status_code == httplib.NOT_FOUND:
return False
else:
raise Exception(
str(response.status_code) + " " + response.reason)
else:
response = urlopen(url, timeout=self.max_wait_time)
if response.code == httplib.OK:
return True
elif response.code == httplib.NOT_FOUND:
return False
else:
raise Exception(str(response.code) + " " + response.msg)
zendesk_localization.py 文件源码
项目:zendesk-help-center-localization
作者: mykola-mokhnach
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def list_article_attachments(self, article_id):
try:
return self._get('articles/{}/attachments.json'.format(article_id)).get('article_attachments', [])
except APIError as e:
if e.error_code == httplib.NOT_FOUND:
return []
raise e
def test_get_qpi_not_found(app_client):
response_not_found = app_client.get("/v1.0/qpis/fake.yaml")
response_data = json.loads(response_not_found.data)
assert response_not_found.status_code == httplib.NOT_FOUND
assert response_data['title'] == "QPI not found"
def cboxDownload():
'''Returns the file's content for a given valid access token. Used as a download URL,
so that the file's path is never explicitly visible.'''
try:
acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
if acctok['exp'] < time.time():
raise jwt.exceptions.ExpiredSignatureError
resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
resp.headers['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(acctok['filename'])
resp.status_code = httplib.OK
Wopi.log.info('msg="cboxDownload: direct download succeeded" filename="%s" user="%s:%s" token="%s"' % \
(acctok['filename'], acctok['ruid'], acctok['rgid'], flask.request.args['access_token'][-20:]))
return resp
except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
(flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
return 'Invalid access token', httplib.NOT_FOUND
except IOError, e:
Wopi.log.info('msg="Requested file not found" filename="%s" token="%s" error="%s"' % \
(acctok['filename'], flask.request.args['access_token'][-20:], e))
return 'File not found', httplib.NOT_FOUND
except KeyError, e:
Wopi.log.error('msg="Invalid access token or request argument" error="%s"' % e)
return 'Invalid access token', httplib.UNAUTHORIZED
except Exception, e:
return _logGeneralExceptionAndReturn(e, flask.request)
def wopiFilesPost(fileid):
'''A dispatcher metod for all POST operations on files'''
Wopi.refreshconfig()
try:
acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
if acctok['exp'] < time.time():
raise jwt.exceptions.ExpiredSignatureError
headers = flask.request.headers
op = headers['X-WOPI-Override'] # must be one of the following strings, throws KeyError if missing
if op in ('LOCK', 'REFRESH_LOCK'):
return wopiLock(fileid, headers, acctok)
elif op == 'UNLOCK':
return wopiUnlock(fileid, headers, acctok)
elif op == 'GET_LOCK':
return wopiGetLock(fileid, headers, acctok)
elif op == 'PUT_RELATIVE':
return wopiPutRelative(fileid, headers, acctok)
elif op == 'DELETE':
return wopiDeleteFile(fileid, headers, acctok)
elif op == 'RENAME_FILE':
return wopiRenameFile(fileid, headers, acctok)
#elif op == 'PUT_USER_INFO': https://wopirest.readthedocs.io/en/latest/files/PutUserInfo.html
else:
Wopi.log.warning('msg="Unknown/unsupported operation" operation="%s"' % op)
return 'Not supported operation found in header', httplib.NOT_IMPLEMENTED
except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
(flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
return 'Invalid access token', httplib.NOT_FOUND
except Exception, e:
return _logGeneralExceptionAndReturn(e, flask.request)
def _handle_delete(gcs_stub, filename):
"""Handle DELETE object."""
if gcs_stub.delete_object(filename):
return _FakeUrlFetchResult(httplib.NO_CONTENT, {}, '')
else:
return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')
def _not_found_404(environ, start_response):
status = httplib.NOT_FOUND
start_response('%d %s' % (status, httplib.responses[status]),
[('Content-Type', 'text/plain')])
return ['%s not found' % environ['PATH_INFO']]
es_drop.py 文件源码
项目:kafka-pyspark-elasticsearch-integration-test
作者: WuyangLI
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def drop_index(cnx, config):
"""Delete index
Parameters:
cnx: HTTP connection to Elastic Search
config: Some settings (including the name of the index)
"""
sys.stderr.write( "Deleting Elastic Search index %s\n" % config.index)
####################################
cnx.request("DELETE",config.index) #
####################################
resp=cnx.getresponse()
sys.stderr.write( resp.read()+"\n")
if resp.status == httplib.NOT_FOUND:
sys.stderr.write( " WARNING: Index %s does not exist - cannot delete\n" % config.index)
elif resp.status != httplib.OK:
raise Exception(" ERROR when deleting " + config.index + ": %d %s" % (resp.status, resp.reason))
else:
sys.stderr.write( " Index deleted\n")
########
# MAIN #
########
def test_metrics(self):
request = {
'start_date': '2017-01-21',
'end_date': '2017-01-22'
}
try:
response = self.client.request_json('Metrics', 'POST', request)
pprint.pprint(response)
except HttpException as ex:
if ex.code == httplib.NOT_FOUND:
print "No metrics loaded"
else:
raise
participant_summary_api_test.py 文件源码
项目:raw-data-repository
作者: all-of-us
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def testQuery_noParticipants(self):
self.send_get('Participant/P1/Summary', expected_status=httplib.NOT_FOUND)
response = self.send_get('ParticipantSummary')
self.assertBundle([], response)
participant_summary_api_test.py 文件源码
项目:raw-data-repository
作者: all-of-us
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def testQuery_noSummaries(self):
participant = self.send_post('Participant', {"providerLink": [self.provider_link]})
participant_id = participant['participantId']
self.send_get('Participant/%s/Summary' % participant_id, expected_status=httplib.NOT_FOUND)
response = self.send_get('ParticipantSummary')
self.assertBundle([], response)
def main(args):
client = Client(parse_cli=False, creds_file=args.creds_file, default_instance=args.instance)
config_path = 'Config/%s' % args.key if args.key else 'Config'
try:
config_server = client.request_json(config_path, 'GET')
formatted_server_config = _json_to_sorted_string(config_server)
except HttpException as e:
if e.code == httplib.NOT_FOUND:
formatted_server_config = ''
else:
raise
if not args.config:
logging.info('----------------- Current Server Config --------------------')
_log_and_write_config_lines(formatted_server_config.split('\n'), args.config_output)
else:
with open(args.config) as config_file:
config_file = json.load(config_file)
if not args.key or args.key == 'current_config':
with open(BASE_CONFIG_FILE) as base_config_file:
combined_config = json.load(base_config_file)
combined_config.update(config_file)
else:
combined_config = config_file
comparable_file = _json_to_sorted_string(combined_config)
configs_match = _compare_configs(comparable_file, formatted_server_config, args.config_output)
if not configs_match and args.update:
logging.info('-------------- Updating Server -------------------')
method = 'POST' if args.key else 'PUT'
client.request_json(config_path, method, combined_config)