def test_delete_running_job(self):
pb = 'tests/fixtures/playbooks/hello_world_with_sleep.yml'
response = self.app.post(
'/runs',
data=json.dumps(dict(playbook_path=pb,
inventory_file='localhost,',
options={'connection': 'local',
'subset': None})),
content_type='application/json')
res_dict = json.loads(response.data)
self.assertEqual(http_client.OK, response.status_code)
response = self.app.delete('/runs/{}'.format(res_dict['id']))
self.assertEqual(http_client.OK, response.status_code)
response = self.app.get('/runs/{}'.format(res_dict['id']))
res_dict = json.loads(response.data)
self.assertEqual('ABORTED', res_dict['state'])
self._wait_for_run_complete(res_dict['id'])
python类OK的实例源码
def validate_href(image_href):
"""Validate HTTP image reference.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if HEAD request failed or
returned response code not equal to 200.
:returns: Response to HEAD request.
"""
try:
response = requests.head(image_href)
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=("Got HTTP code %s instead of 200 in response to "
"HEAD request." % response.status_code))
except requests.RequestException as e:
raise exception.ImageRefValidationFailed(image_href=image_href,
reason=e)
return response
def test_get_state(self):
"""Test getting an instance state."""
self.impl.get.return_value = {
'name': 'foo.bar#0000000001', 'host': 'baz1', 'state': 'running'
}
resp = self.client.get('/state/foo.bar#0000000001')
resp_json = b''.join(resp.response)
self.assertEqual(json.loads(resp_json.decode()), {
'name': 'foo.bar#0000000001', 'oom': None, 'signal': None,
'expires': None, 'when': None, 'host': 'baz1',
'state': 'running', 'exitcode': None
})
self.assertEqual(resp.status_code, http_client.OK)
self.impl.get.assert_called_with('foo.bar#0000000001')
self.impl.get.return_value = None
resp = self.client.get('/state/foo.bar#0000000002')
self.assertEqual(resp.status_code, http_client.NOT_FOUND)
def test_get_server_list(self):
"""Test getting a list of servers."""
server_list = [
{'cell': 'foo', 'traits': [], '_id': 'server1', 'data': []},
{'cell': 'bar', 'traits': [], '_id': 'server2', 'data': []}
]
self.impl.list.return_value = server_list
resp = self.client.get('/server/')
self.assertEqual(''.join(resp.response), json.dumps(server_list))
self.assertEqual(resp.status_code, http_client.OK)
self.impl.list.assert_called_with(None, None)
resp = self.client.get('/server/?cell=foo')
self.assertEqual(resp.status_code, http_client.OK)
self.impl.list.assert_called_with('foo', None)
resp = self.client.get('/server/?partition=baz')
self.assertEqual(resp.status_code, http_client.OK)
self.impl.list.assert_called_with(None, 'baz')
resp = self.client.get('/server/?cell=foo&partition=baz')
self.assertEqual(resp.status_code, http_client.OK)
self.impl.list.assert_called_with('foo', 'baz')
def test_replace_singular(self, mock_utcnow):
description = 'server-new-description'
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.patch_json('/servers/%s' % self.server.uuid,
[{'path': '/description',
'value': description, 'op': 'replace'}],
headers=self.headers)
self.assertEqual('application/json', response.content_type)
self.assertEqual(http_client.OK, response.status_code)
result = self.get_json('/servers/%s' % self.server.uuid,
headers=self.headers)
self.assertEqual(description, result['description'])
return_updated_at = timeutils.parse_isotime(
result['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
def test_replace_multi(self):
extra = {"foo1": "bar1", "foo2": "bar2", "foo3": "bar3"}
uuid = uuidutils.generate_uuid()
server = utils.create_test_server(name='test1', uuid=uuid,
extra=extra)
new_value = 'new value'
response = self.patch_json('/servers/%s' % server.uuid,
[{'path': '/metadata/foo2',
'value': new_value, 'op': 'replace'}],
headers=self.headers)
self.assertEqual('application/json', response.content_type)
self.assertEqual(http_client.OK, response.status_code)
result = self.get_json('/servers/%s' % server.uuid,
headers=self.headers)
extra["foo2"] = new_value
self.assertEqual(extra, result['metadata'])
def test_remove_singular(self):
uuid = uuidutils.generate_uuid()
server = utils.create_test_server(name='test2', uuid=uuid,
extra={'a': 'b'})
response = self.patch_json('/servers/%s' % server.uuid,
[{'path': '/description', 'op': 'remove'}],
headers=self.headers)
self.assertEqual('application/json', response.content_type)
self.assertEqual(http_client.OK, response.status_code)
result = self.get_json('/servers/%s' % server.uuid,
headers=self.headers)
self.assertIsNone(result['description'])
# Assert nothing else was changed
self.assertEqual(server.uuid, result['uuid'])
self.assertEqual(server.extra, result['metadata'])
def _get_service_account_email(http_request=None):
"""Get the GCE service account email from the current environment.
Args:
http_request: callable, (Optional) a callable that matches the method
signature of httplib2.Http.request, used to make
the request to the metadata service.
Returns:
tuple, A pair where the first entry is an optional response (from a
failed request) and the second is service account email found (as
a string).
"""
if http_request is None:
http_request = httplib2.Http().request
response, content = http_request(
_DEFAULT_EMAIL_METADATA, headers={'Metadata-Flavor': 'Google'})
if response.status == http_client.OK:
content = _from_bytes(content)
return None, content
else:
return response, content
def do_GET(self):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
self.send_response(http_client.OK)
self.send_header("Content-type", "text/html")
self.end_headers()
query = self.path.split('?', 1)[-1]
query = dict(urllib.parse.parse_qsl(query))
self.server.query_params = query
self.wfile.write(
b"<html><head><title>Authentication Status</title></head>")
self.wfile.write(
b"<body><p>The authentication flow has completed.</p>")
self.wfile.write(b"</body></html>")
def _get_service_account_email(http_request=None):
"""Get the GCE service account email from the current environment.
Args:
http_request: callable, (Optional) a callable that matches the method
signature of httplib2.Http.request, used to make
the request to the metadata service.
Returns:
tuple, A pair where the first entry is an optional response (from a
failed request) and the second is service account email found (as
a string).
"""
if http_request is None:
http_request = httplib2.Http().request
response, content = http_request(
_DEFAULT_EMAIL_METADATA, headers={'Metadata-Flavor': 'Google'})
if response.status == http_client.OK:
content = _from_bytes(content)
return None, content
else:
return response, content
def remote_image_exist(registry, image, tag):
urllib3.disable_warnings()
url = IMAGE_TAGS_URL % dict(registry=registry, image=image)
response = requests.get(url=url, verify=False)
if response.status_code != http_client.OK:
return False
info = response.json()
return tag in info['tags']
def _do_revoke(self, http_request, token):
"""Revokes this credential and deletes the stored copy (if it exists).
Args:
http_request: callable, a callable that matches the method
signature of httplib2.Http.request, used to make the
refresh request.
token: A string used as the token to be revoked. Can be either an
access_token or refresh_token.
Raises:
TokenRevokeError: If the revoke request does not return with a
200 OK.
"""
logger.info('Revoking token')
query_params = {'token': token}
token_revoke_uri = _update_query_params(self.revoke_uri, query_params)
resp, content = http_request(token_revoke_uri)
if resp.status == http_client.OK:
self.invalid = True
else:
error_msg = 'Invalid response %s.' % resp.status
try:
d = json.loads(_from_bytes(content))
if 'error' in d:
error_msg = d['error']
except (TypeError, ValueError):
pass
raise TokenRevokeError(error_msg)
if self.store:
self.store.delete()
def _do_retrieve_scopes(self, http_request, token):
"""Retrieves the list of authorized scopes from the OAuth2 provider.
Args:
http_request: callable, a callable that matches the method
signature of httplib2.Http.request, used to make the
refresh request.
token: A string used as the token to identify the credentials to
the provider.
Raises:
Error: When refresh fails, indicating the the access token is
invalid.
"""
logger.info('Refreshing scopes')
query_params = {'access_token': token, 'fields': 'scope'}
token_info_uri = _update_query_params(self.token_info_uri,
query_params)
resp, content = http_request(token_info_uri)
content = _from_bytes(content)
if resp.status == http_client.OK:
d = json.loads(content)
self.scopes = set(util.string_to_scopes(d.get('scope', '')))
else:
error_msg = 'Invalid response %s.' % (resp.status,)
try:
d = json.loads(content)
if 'error_description' in d:
error_msg = d['error_description']
except (TypeError, ValueError):
pass
raise Error(error_msg)
def _detect_gce_environment():
"""Determine if the current environment is Compute Engine.
Returns:
Boolean indicating whether or not the current environment is Google
Compute Engine.
"""
# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
# could lead to false negatives in the event that we are on GCE, but
# the metadata resolution was particularly slow. The latter case is
# "unlikely".
connection = six.moves.http_client.HTTPConnection(
_GCE_METADATA_HOST, timeout=1)
try:
headers = {_METADATA_FLAVOR_HEADER: _DESIRED_METADATA_FLAVOR}
connection.request('GET', '/', headers=headers)
response = connection.getresponse()
if response.status == http_client.OK:
return (response.getheader(_METADATA_FLAVOR_HEADER) ==
_DESIRED_METADATA_FLAVOR)
except socket.error: # socket.timeout or socket.error(64, 'Host is down')
logger.info('Timeout attempting to reach GCE metadata service.')
return False
finally:
connection.close()
def verify_id_token(id_token, audience, http=None,
cert_uri=ID_TOKEN_VERIFICATION_CERTS):
"""Verifies a signed JWT id_token.
This function requires PyOpenSSL and because of that it does not work on
App Engine.
Args:
id_token: string, A Signed JWT.
audience: string, The audience 'aud' that the token should be for.
http: httplib2.Http, instance to use to make the HTTP request. Callers
should supply an instance that has caching enabled.
cert_uri: string, URI of the certificates in JSON format to
verify the JWT against.
Returns:
The deserialized JSON in the JWT.
Raises:
oauth2client.crypt.AppIdentityError: if the JWT fails to verify.
CryptoUnavailableError: if no crypto library is available.
"""
_RequireCryptoOrDie()
if http is None:
http = _cached_http
resp, content = http.request(cert_uri)
if resp.status == http_client.OK:
certs = json.loads(_from_bytes(content))
return crypt.verify_signed_jwt_with_certs(id_token, certs, audience)
else:
raise VerifyJwtTokenError('Status code: %d' % resp.status)
def _refresh(self, http_request):
"""Refreshes the access_token.
Skip all the storage hoops and just refresh using the API.
Args:
http_request: callable, a callable that matches the method
signature of httplib2.Http.request, used to make
the refresh request.
Raises:
HttpAccessTokenRefreshError: When the refresh fails.
"""
query = '?scope=%s' % urllib.parse.quote(self.scope, '')
uri = META.replace('{?scope}', query)
response, content = http_request(
uri, headers={'Metadata-Flavor': 'Google'})
content = _from_bytes(content)
if response.status == http_client.OK:
try:
token_content = json.loads(content)
except Exception as e:
raise HttpAccessTokenRefreshError(str(e),
status=response.status)
self.access_token = token_content['access_token']
else:
if response.status == http_client.NOT_FOUND:
content += (' This can occur if a VM was created'
' with no service account or scopes.')
raise HttpAccessTokenRefreshError(content, status=response.status)
def _update_image_meta_v2(conn, extra_headers, properties, patch_path):
# NOTE(sirp): There is some confusion around OVF. Here's a summary
# of where we currently stand:
# 1. OVF as a container format is misnamed. We really should be
# using OVA since that is the name for the container format;
# OVF is the standard applied to the manifest file contained
# within.
# 2. We're currently uploading a vanilla tarball. In order to be
# OVF/OVA compliant, we'll need to embed a minimal OVF
# manifest as the first file.
body = [
{"path": "/container_format", "value": "ovf", "op": "add"},
{"path": "/disk_format", "value": "vhd", "op": "add"},
{"path": "/visibility", "value": "private", "op": "add"}]
headers = {'Content-Type': 'application/openstack-images-v2.1-json-patch'}
headers.update(**extra_headers)
for key, value in properties.items():
prop = {"path": "/%s" % key.replace('_', '-'),
"value": str(value),
"op": "add"}
body.append(prop)
body = json.dumps(body)
conn.request('PATCH', patch_path, body=body, headers=headers)
resp = conn.getresponse()
resp.read()
if resp.status == httplib.OK:
return
logging.error("Image meta was not updated. Status: %s, Reason: %s" % (
resp.status, resp.reason))
def after(self, state):
# Omit empty body. Some errors may not have body at this level yet.
if not state.response.body:
return
# Do nothing if there is no error.
# Status codes in the range 200 (OK) to 399 (400 = BAD_REQUEST) are not
# an error.
if (http_client.OK <= state.response.status_int <
http_client.BAD_REQUEST):
return
json_body = state.response.json
# Do not remove traceback when traceback config is set
if cfg.CONF.debug_tracebacks_in_api:
return
faultstring = json_body.get('faultstring')
traceback_marker = 'Traceback (most recent call last):'
if faultstring and traceback_marker in faultstring:
# Cut-off traceback.
faultstring = faultstring.split(traceback_marker, 1)[0]
# Remove trailing newlines and spaces if any.
json_body['faultstring'] = faultstring.rstrip()
# Replace the whole json. Cannot change original one because it's
# generated on the fly.
state.response.json = json_body
def test_list_nodes(self, mock_get_token):
mock_get_token.return_value = 'FAKE-TOKEN-123'
self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/vdc/nodes',
status_code=http_client.OK,
json=self.returned_json)
response = self.client.node.list()
self.assertEqual(self.requests_mock.last_request.method, 'GET')
self.assertEqual(self.requests_mock.last_request.url, 'https://127.0.0.1:4443/vdc/nodes')
self.assertEqual(self.requests_mock.last_request.headers['x-sds-auth-token'], 'FAKE-TOKEN-123')
self.assertEqual(response, self.returned_json)
def _do_revoke(self, http, token):
"""Revokes this credential and deletes the stored copy (if it exists).
Args:
http: an object to be used to make HTTP requests.
token: A string used as the token to be revoked. Can be either an
access_token or refresh_token.
Raises:
TokenRevokeError: If the revoke request does not return with a
200 OK.
"""
logger.info('Revoking token')
query_params = {'token': token}
token_revoke_uri = _helpers.update_query_params(
self.revoke_uri, query_params)
resp, content = transport.request(http, token_revoke_uri)
if resp.status == http_client.METHOD_NOT_ALLOWED:
body = urllib.parse.urlencode(query_params)
resp, content = transport.request(http, token_revoke_uri,
method='POST', body=body)
if resp.status == http_client.OK:
self.invalid = True
else:
error_msg = 'Invalid response {0}.'.format(resp.status)
try:
d = json.loads(_helpers._from_bytes(content))
if 'error' in d:
error_msg = d['error']
except (TypeError, ValueError):
pass
raise TokenRevokeError(error_msg)
if self.store:
self.store.delete()