def test_run_with_existing_remote_build_container(self, skipper_runner_run_mock, requests_get_mock):
requests_response_class_mock = mock.MagicMock(spec='requests.Response')
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb', 'build-container-tag']
}
requests_response_mock.status_code = http_client.OK
requests_get_mock.return_value = requests_response_mock
command = ['ls', '-l']
run_params = command
self._invoke_cli(
global_params=self.global_params,
subcmd='run',
subcmd_params=run_params
)
expected_image_name = 'registry.io:5000/build-container-image:build-container-tag'
skipper_runner_run_mock.assert_called_once_with(command, fqdn_image=expected_image_name, environment=[],
interactive=False, name=None, net='host', volumes=None,
workdir=None, use_cache=False)
python类OK的实例源码
def do_GET(self):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
self.send_response(http_client.OK)
self.send_header("Content-type", "text/html")
self.end_headers()
query = self.path.split('?', 1)[-1]
query = dict(urllib.parse.parse_qsl(query))
self.server.query_params = query
self.wfile.write(
b"<html><head><title>Authentication Status</title></head>")
self.wfile.write(
b"<body><p>The authentication flow has completed.</p>")
self.wfile.write(b"</body></html>")
def _detect_gce_environment():
"""Determine if the current environment is Compute Engine.
Returns:
Boolean indicating whether or not the current environment is Google
Compute Engine.
"""
# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
# could lead to false negatives in the event that we are on GCE, but
# the metadata resolution was particularly slow. The latter case is
# "unlikely".
http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT)
try:
response, _ = transport.request(
http, _GCE_METADATA_URI, headers=_GCE_HEADERS)
return (
response.status == http_client.OK and
response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR)
except socket.error: # socket.timeout or socket.error(64, 'Host is down')
logger.info('Timeout attempting to reach GCE metadata service.')
return False
def do_GET(self):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
self.send_response(http_client.OK)
self.send_header('Content-type', 'text/html')
self.end_headers()
parts = urllib.parse.urlparse(self.path)
query = _helpers.parse_unique_urlencoded(parts.query)
self.server.query_params = query
self.wfile.write(
b'<html><head><title>Authentication Status</title></head>')
self.wfile.write(
b'<body><p>The authentication flow has completed.</p>')
self.wfile.write(b'</body></html>')
def _list_request(self, resource, permanent=False, headers=None,
extra_headers=False, **kwargs):
"""Get the list of objects of the specified type.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param headers: List of headers to use in request.
:param extra_headers: Specify whether to use headers.
:param **kwargs: Parameters for the request.
:returns: A tuple with the server response and deserialized JSON list
of objects
"""
uri = self._get_uri(resource, permanent=permanent)
if kwargs:
uri += "?%s" % urllib.urlencode(kwargs)
resp, body = self.get(uri, headers=headers,
extra_headers=extra_headers)
self.expected_success(http_client.OK, resp.status)
return resp, self.deserialize(body)
def _show_request(self,
resource,
uuid=None,
permanent=False,
**kwargs):
"""Gets a specific object of the specified type.
:param uuid: Unique identifier of the object in UUID format.
:returns: Serialized object as a dictionary.
"""
if 'uri' in kwargs:
uri = kwargs['uri']
else:
uri = self._get_uri(resource, uuid=uuid, permanent=permanent)
resp, body = self.get(uri)
self.expected_success(http_client.OK, resp.status)
return resp, self.deserialize(body)
run_gce_system_tests.py 文件源码
项目:deb-python-oauth2client
作者: openstack
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_token_info(self):
credentials = gce.AppAssertionCredentials([])
http = transport.get_http_object()
# First refresh to get the access token.
self.assertIsNone(credentials.access_token)
credentials.refresh(http)
self.assertIsNotNone(credentials.access_token)
# Then check the access token against the token info API.
query_params = {'access_token': credentials.access_token}
token_uri = (oauth2client.GOOGLE_TOKEN_INFO_URI + '?' +
urllib.parse.urlencode(query_params))
response, content = transport.request(http, token_uri)
self.assertEqual(response.status, http_client.OK)
content = content.decode('utf-8')
payload = json.loads(content)
self.assertEqual(payload['access_type'], 'offline')
self.assertLessEqual(int(payload['expires_in']), 3600)
def test_has_credentials_in_storage(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.Mock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = True
credentials_mock.invalid = False
credentials_mock.scopes = set([])
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse('test')
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertEqual(response.content, b'test')
self.assertTrue(request.oauth.has_credentials())
self.assertIsNotNone(request.oauth.http)
self.assertSetEqual(
request.oauth.scopes,
set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
def test_specified_scopes(self, dictionary_storage_mock):
request = self.factory.get('/test')
request.session = mock.Mock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = mock.Mock(return_value=True)
credentials_mock.is_valid = True
dictionary_storage_mock.get.return_value = credentials_mock
@decorators.oauth_enabled(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse('hello world') # pragma: NO COVER
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
def test_get_token_success(self, now):
http = request_mock(
http_client.OK,
'application/json',
json.dumps({'access_token': 'a', 'expires_in': 100})
)
token, expiry = _metadata.get_token(http=http)
self.assertEqual(token, 'a')
self.assertEqual(
expiry, datetime.datetime.min + datetime.timedelta(seconds=100))
# Verify mocks.
now.assert_called_once_with()
self.assertEqual(http.requests, 1)
self.assertEqual(http.uri, EXPECTED_URL + '/token')
self.assertEqual(http.method, 'GET')
self.assertIsNone(http.body)
self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
def test_exchange_using_authorization_header(self):
auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
flow = client.OAuth2WebServerFlow(
client_id='client_id+1',
authorization_header=auth_header,
scope='foo',
redirect_uri=client.OOB_CALLBACK_URN,
user_agent='unittest-sample/1.0',
revoke_uri='dummy_revoke_uri',
)
http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
])
credentials = flow.step2_exchange(code='some random code', http=http)
self.assertEqual('SlAV32hkKG', credentials.access_token)
self.assertEqual(len(http.requests), 1)
test_request = http.requests[0]
# Did we pass the Authorization header?
self.assertEqual(test_request['headers']['Authorization'], auth_header)
# Did we omit client_secret from POST body?
self.assertTrue('client_secret' not in test_request['body'])
def __init__(self, headers=None, data=None):
"""HttpMock constructor.
Args:
headers: dict, header to return with response
"""
if headers is None:
headers = {'status': http_client.OK}
self.data = data
self.response_headers = headers
self.headers = None
self.uri = None
self.method = None
self.body = None
self.headers = None
self.requests = 0
def _detect_gce_environment():
"""Determine if the current environment is Compute Engine.
Returns:
Boolean indicating whether or not the current environment is Google
Compute Engine.
"""
# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
# could lead to false negatives in the event that we are on GCE, but
# the metadata resolution was particularly slow. The latter case is
# "unlikely".
http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT)
try:
response, _ = transport.request(
http, _GCE_METADATA_URI, headers=_GCE_HEADERS)
return (
response.status == http_client.OK and
response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR)
except socket.error: # socket.timeout or socket.error(64, 'Host is down')
logger.info('Timeout attempting to reach GCE metadata service.')
return False
def do_GET(self):
"""Handle a GET request.
Parses the query parameters and prints a message
if the flow has completed. Note that we can't detect
if an error occurred.
"""
self.send_response(http_client.OK)
self.send_header('Content-type', 'text/html')
self.end_headers()
parts = urllib.parse.urlparse(self.path)
query = _helpers.parse_unique_urlencoded(parts.query)
self.server.query_params = query
self.wfile.write(
b'<html><head><title>Authentication Status</title></head>')
self.wfile.write(
b'<body><p>The authentication flow has completed.</p>')
self.wfile.write(b'</body></html>')
def test_has_credentials_in_storage(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.Mock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = True
credentials_mock.invalid = False
credentials_mock.scopes = set([])
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse('test')
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertEqual(response.content, b'test')
self.assertTrue(request.oauth.has_credentials())
self.assertIsNotNone(request.oauth.http)
self.assertSetEqual(
request.oauth.scopes,
set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
def test_specified_scopes(self, dictionary_storage_mock):
request = self.factory.get('/test')
request.session = mock.Mock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = mock.Mock(return_value=True)
credentials_mock.is_valid = True
dictionary_storage_mock.get.return_value = credentials_mock
@decorators.oauth_enabled(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse('hello world') # pragma: NO COVER
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
def test_has_credentials_in_storage(self, UserOAuth2):
request = self.factory.get('/test')
request.session = mock.Mock()
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test")
my_user_oauth = mock.Mock()
UserOAuth2.return_value = my_user_oauth
my_user_oauth.has_credentials.return_value = True
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertEqual(response.content, b"test")
def test_get_token_success(self, now):
http = request_mock(
http_client.OK,
'application/json',
json.dumps({'access_token': 'a', 'expires_in': 100})
)
token, expiry = _metadata.get_token(http=http)
self.assertEqual(token, 'a')
self.assertEqual(
expiry, datetime.datetime.min + datetime.timedelta(seconds=100))
# Verify mocks.
now.assert_called_once_with()
self.assertEqual(http.requests, 1)
self.assertEqual(http.uri, EXPECTED_URL + '/token')
self.assertEqual(http.method, 'GET')
self.assertIsNone(http.body)
self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
def test_exchange_with_pkce(self):
http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
])
flow = client.OAuth2WebServerFlow(
'client_id+1',
scope='foo',
redirect_uri='http://example.com',
pkce=True,
code_verifier=self.good_verifier)
flow.step2_exchange(code='some random code', http=http)
self.assertEqual(len(http.requests), 1)
test_request = http.requests[0]
self.assertIn(
'code_verifier={0}'.format(self.good_verifier.decode()),
test_request['body'])
def test_exchange_using_authorization_header(self):
auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
flow = client.OAuth2WebServerFlow(
client_id='client_id+1',
authorization_header=auth_header,
scope='foo',
redirect_uri=client.OOB_CALLBACK_URN,
user_agent='unittest-sample/1.0',
revoke_uri='dummy_revoke_uri',
)
http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
])
credentials = flow.step2_exchange(code='some random code', http=http)
self.assertEqual('SlAV32hkKG', credentials.access_token)
self.assertEqual(len(http.requests), 1)
test_request = http.requests[0]
# Did we pass the Authorization header?
self.assertEqual(test_request['headers']['Authorization'], auth_header)
# Did we omit client_secret from POST body?
self.assertTrue('client_secret' not in test_request['body'])
def __init__(self, headers=None, data=None):
"""HttpMock constructor.
Args:
headers: dict, header to return with response
"""
if headers is None:
headers = {'status': http_client.OK}
self.data = data
self.response_headers = headers
self.headers = None
self.uri = None
self.method = None
self.body = None
self.headers = None
self.requests = 0
def test_allocate_node_conflict(self, mock_request, mock_get_url):
"""Test allocate resource conflict when compose node"""
mock_get_url.return_value = '/redfish/v1/Nodes'
# Fake response for getting nodes root
fake_node_root_resp = fakes.mock_request_get(fakes.fake_nodes_root(),
http_client.OK)
# Fake response for allocating node
fake_node_allocation_conflict = \
fakes.mock_request_get(fakes.fake_allocate_node_conflict(),
http_client.CONFLICT)
mock_request.side_effect = [fake_node_root_resp,
fake_node_allocation_conflict]
with self.assertRaises(exception.RedfishException) as context:
redfish.compose_node({"name": "test_node"})
self.assertTrue("There are no computer systems available for this "
"allocation request." in str(context.exception.detail))
def test_set_boot_source_success(self, mock_get_url, mock_request):
"""Test successfully reset node status"""
mock_get_url.return_value = '/redfish/v1/Nodes'
fake_node_detail = fakes.mock_request_get(
fakes.fake_node_detail(), http_client.OK)
fake_node_action_resp = fakes.mock_request_get(
{}, http_client.NO_CONTENT)
mock_request.side_effect = [fake_node_detail, fake_node_action_resp]
result = redfish.set_boot_source(
"1", {"Boot": {"Enabled": "Once", "Target": "Hdd"}})
expected = exception.confirmation(
confirm_code="Set Boot Source of Composed Node",
confirm_detail="The boot source of composed node has been set to "
"'{0}' with enabled state '{1}' successfully."
.format("Hdd", "Once"))
self.assertEqual(expected, result)
def test_podmanager_create(self, pstatus_mock, plist_mock, pcreate_mock):
pstatus_mock.return_value = constants.PODM_STATUS_ONLINE
plist_mock.return_value = []
values = {
"name": "podm_name",
"url": "https://10.240.212.123",
"authentication": [
{
"type": "basic",
"auth_items":
{
"username": "xxxxxxx",
"password": "xxxxxxx"
}
}]
}
result = copy.deepcopy(values)
result['status'] = constants.PODM_STATUS_ONLINE
pcreate_mock.return_value = result
response = self.app.post('/v1/pod_managers',
content_type='application/json',
data=json.dumps(values))
self.assertEqual(http_client.OK, response.status_code)
def test_compose_request_using_properties(self, mock_compose, mock_conn):
req = {
"name": "test_request",
"podm_id": "test-podm",
"properties": {
"memory": {
"capacity_mib": "4000",
"type": "DDR3"
},
"processor": {
"model": "Intel",
"total_cores": "4"
}
}
}
mock_compose.return_value = req
resp = self.app.post('/v1/nodes',
content_type='application/json',
data=json.dumps(req))
mock_conn.assert_called_once_with('test-podm')
self.assertEqual(http_client.OK, resp.status_code)
def show_cpu_details(cpu_url):
"""Get processor details .
:param cpu_url: relative redfish url to processor,
e.g /redfish/v1/Systems/1/Processors/1.
:returns: dict of processor detail.
"""
resp = send_request(cpu_url)
if resp.status_code != http_client.OK:
# Raise exception if don't find processor
raise exception.RedfishException(resp.json(),
status_code=resp.status_code)
respdata = resp.json()
cpu_details = {
"instruction_set": respdata.get("InstructionSet"),
"model": respdata.get("Model"),
"speed_mhz": respdata.get("MaxSpeedMHz"),
"total_core": respdata.get("TotalCores")
}
return cpu_details
def show_ram_details(ram_url):
"""Get memory details .
:param ram_url: relative redfish url to memory,
e.g /redfish/v1/Systems/1/Memory/1.
:returns: dict of memory detail.
"""
resp = send_request(ram_url)
if resp.status_code != http_client.OK:
# Raise exception if don't find memory
raise exception.RedfishException(resp.json(),
status_code=resp.status_code)
respdata = resp.json()
ram_details = {
"data_width_bit": respdata.get("DataWidthBits"),
"speed_mhz": respdata.get("OperatingSpeedMhz"),
"total_memory_mb": respdata.get("CapacityMiB")
}
return ram_details
def test_http_request_client_success(self):
kwargs = {"valence_url": "http://localhost/"}
client = http.HTTPClient(**kwargs)
resp = utils.mockSessionResponse(
{'content-type': 'test/plain'},
'Body',
version=1,
status_code=http_client.OK)
with mock.patch.object(client, 'session',
autospec=True) as mock_session:
mock_session.request.side_effect = iter([resp])
response, body_iter = client.json_request('GET',
'/redfish/v1/Nodes')
self.assertEqual(http_client.OK, response.status_code)
def test_get_run_by_id(self):
response = self.app.get('/runs/1234')
self.assertEqual(http_client.NOT_FOUND, response.status_code)
pb = 'tests/fixtures/playbooks/hello_world.yml'
response = self.app.post(
'/runs',
data=json.dumps(dict(playbook_path=pb,
inventory_file='localhosti,',
options={'connection': 'local'})),
content_type='application/json')
res_dict = json.loads(response.data)
run_id = res_dict['id']
self.assertEqual(http_client.OK, response.status_code)
response = self.app.get('/runs/{}'.format(run_id))
self.assertEqual(http_client.OK, response.status_code)
self._wait_for_run_complete(run_id)
def test_get_run_list(self):
pb = 'tests/fixtures/playbooks/hello_world.yml'
response = self.app.post(
'/runs',
data=json.dumps(dict(playbook_path=pb,
inventory_file='localhost,',
options={'connection': 'local'})),
content_type='application/json')
res_dict = json.loads(response.data)
run_id = res_dict['id']
self.assertEqual(http_client.OK, response.status_code)
response = self.app.get('/runs')
res_list = json.loads(response.data)
found = False
for item in res_list:
if item['id'] == run_id:
found = True
break
self.assertEqual(True, found)
self._wait_for_run_complete(run_id)