def test_ensure_loadbalancer(self):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
expected_resp = mock.sentinel.expected_resp
namespace = 'TEST_NAMESPACE'
name = 'TEST_NAME'
project_id = 'TEST_PROJECT'
subnet_id = 'D3FA400A-F543-4B91-9CD3-047AF0CE42D1'
ip = '1.2.3.4'
# TODO(ivc): handle security groups
sg_ids = []
endpoints = {'metadata': {'namespace': namespace, 'name': name}}
m_driver._ensure.return_value = expected_resp
resp = cls.ensure_loadbalancer(m_driver, endpoints, project_id,
subnet_id, ip, sg_ids)
m_driver._ensure.assert_called_once_with(mock.ANY,
m_driver._create_loadbalancer,
m_driver._find_loadbalancer)
req = m_driver._ensure.call_args[0][0]
self.assertEqual("%s/%s" % (namespace, name), req.name)
self.assertEqual(project_id, req.project_id)
self.assertEqual(subnet_id, req.subnet_id)
self.assertEqual(ip, str(req.ip))
self.assertEqual(expected_resp, resp)
python类ANY的实例源码
def test_ensure_pool(self):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
expected_resp = mock.sentinel.expected_resp
endpoints = mock.sentinel.endpoints
loadbalancer = obj_lbaas.LBaaSLoadBalancer(
id='00EE9E11-91C2-41CF-8FD4-7970579E5C4C',
project_id='TEST_PROJECT')
listener = obj_lbaas.LBaaSListener(
id='A57B7771-6050-4CA8-A63C-443493EC98AB',
name='TEST_LISTENER_NAME',
protocol='TCP')
m_driver._ensure_provisioned.return_value = expected_resp
resp = cls.ensure_pool(m_driver, endpoints, loadbalancer, listener)
m_driver._ensure_provisioned.assert_called_once_with(
loadbalancer, mock.ANY, m_driver._create_pool,
m_driver._find_pool)
pool = m_driver._ensure_provisioned.call_args[0][1]
self.assertEqual(listener.name, pool.name)
self.assertEqual(loadbalancer.project_id, pool.project_id)
self.assertEqual(listener.id, pool.listener_id)
self.assertEqual(listener.protocol, pool.protocol)
self.assertEqual(expected_resp, resp)
def test_annotate(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
ret = {'metadata': {'annotations': annotations,
"resourceVersion": resource_version}}
data = jsonutils.dumps(ret, sort_keys=True)
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_patch.return_value = m_resp
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_called_once_with(self.base_url + path,
data=data, headers=mock.ANY,
cert=(None, None), verify=False)
def test_execute_clean_step_no_success_log(
self, log_mock, run_mock, utils_mock, parse_driver_info_mock):
run_mock.side_effect = exception.InstanceDeployFailure('Boom')
step = {'priority': 10, 'interface': 'deploy',
'step': 'erase_devices', 'args': {'tags': ['clean']}}
di_info = self.node.driver_internal_info
di_info['agent_url'] = 'http://127.0.0.1'
self.node.driver_internal_info = di_info
self.node.save()
with task_manager.acquire(self.context, self.node.uuid) as task:
self.driver.execute_clean_step(task, step)
log_mock.error.assert_called_once_with(
mock.ANY, {'node': task.node['uuid'],
'step': 'erase_devices'})
utils_mock.assert_called_once_with(task, 'Boom')
self.assertFalse(log_mock.info.called)
def test_send_magic_packets(self, mock_socket):
fake_socket = mock.Mock(spec=socket, spec_set=True)
mock_socket.return_value = fake_socket()
obj_utils.create_test_port(self.context,
uuid=uuidutils.generate_uuid(),
address='aa:bb:cc:dd:ee:ff',
node_id=self.node.id)
with task_manager.acquire(
self.context, self.node.uuid, shared=True) as task:
wol_power._send_magic_packets(task, '255.255.255.255', 9)
expected_calls = [
mock.call(),
mock.call().setsockopt(socket.SOL_SOCKET,
socket.SO_BROADCAST, 1),
mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
mock.call().close()]
fake_socket.assert_has_calls(expected_calls)
self.assertEqual(1, mock_socket.call_count)
def test__set_boot_device_order_fail(self, mock_aw, mock_client_pywsman):
namespace = resource_uris.CIM_BootConfigSetting
device = boot_devices.PXE
result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}],
namespace)
mock_xml = test_utils.mock_wsman_root(result_xml)
mock_pywsman = mock_client_pywsman.Client.return_value
mock_pywsman.invoke.return_value = mock_xml
self.assertRaises(exception.AMTFailure,
amt_mgmt._set_boot_device_order, self.node, device)
mock_pywsman.invoke.assert_called_once_with(
mock.ANY, namespace, 'ChangeBootOrder', mock.ANY)
mock_pywsman = mock_client_pywsman.Client.return_value
mock_pywsman.invoke.return_value = None
self.assertRaises(exception.AMTConnectFailure,
amt_mgmt._set_boot_device_order, self.node, device)
self.assertTrue(mock_aw.called)
def test__enable_boot_config_fail(self, mock_aw, mock_client_pywsman):
namespace = resource_uris.CIM_BootService
result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}],
namespace)
mock_xml = test_utils.mock_wsman_root(result_xml)
mock_pywsman = mock_client_pywsman.Client.return_value
mock_pywsman.invoke.return_value = mock_xml
self.assertRaises(exception.AMTFailure,
amt_mgmt._enable_boot_config, self.node)
mock_pywsman.invoke.assert_called_once_with(
mock.ANY, namespace, 'SetBootConfigRole', mock.ANY)
mock_pywsman = mock_client_pywsman.Client.return_value
mock_pywsman.invoke.return_value = None
self.assertRaises(exception.AMTConnectFailure,
amt_mgmt._enable_boot_config, self.node)
self.assertTrue(mock_aw.called)
def test__get_nm_address_raw_fail(self, parse_mock, dump_mock, raw_mock,
unlink_mock):
parse_mock.return_value = ('0x0A', '0x0B')
raw_mock.side_effect = exception.IPMIFailure('raw error')
with task_manager.acquire(self.context, self.node.uuid,
shared=False) as task:
self.assertRaises(exception.IPMIFailure, nm_vendor._get_nm_address,
task)
self.node.refresh()
internal_info = self.node.driver_internal_info
self.assertEqual(False, internal_info['intel_nm_address'])
self.assertEqual(False, internal_info['intel_nm_channel'])
parse_mock.assert_called_once_with(self.temp_filename)
dump_mock.assert_called_once_with(task, self.temp_filename)
unlink_mock.assert_called_once_with(self.temp_filename)
raw_mock.assert_called_once_with(task, mock.ANY)
def test_rpc(
self, container, entrypoint_tracker, rpc_proxy, wait_for_result,
backoff_count
):
""" RPC entrypoint supports backoff
"""
with entrypoint_waiter(
container, 'method', callback=wait_for_result
) as result:
res = rpc_proxy.service.method("arg")
assert res == result.get() == "result"
# entrypoint fired backoff_count + 1 times
assert entrypoint_tracker.get_results() == (
[None] * backoff_count + ["result"]
)
# entrypoint raised `Backoff` for all but the last execution
assert entrypoint_tracker.get_exceptions() == (
[(Backoff, ANY, ANY)] * backoff_count + [None]
)
def test_events(
self, container, entrypoint_tracker, dispatch_event, wait_for_result,
backoff_count
):
""" Event handler supports backoff
"""
with entrypoint_waiter(
container, 'method', callback=wait_for_result
) as result:
dispatch_event("src_service", "event_type", {})
assert result.get() == "result"
assert entrypoint_tracker.get_results() == (
[None] * backoff_count + ["result"]
)
assert entrypoint_tracker.get_exceptions() == (
[(Backoff, ANY, ANY)] * backoff_count + [None]
)
def test_messaging(
self, container, entrypoint_tracker, publish_message, exchange, queue,
wait_for_result, backoff_count
):
""" Message consumption supports backoff
"""
with entrypoint_waiter(
container, 'method', callback=wait_for_result
) as result:
publish_message(exchange, "msg", routing_key=queue.routing_key)
assert result.get() == "result"
assert entrypoint_tracker.get_results() == (
[None] * backoff_count + ["result"]
)
assert entrypoint_tracker.get_exceptions() == (
[(Backoff, ANY, ANY)] * backoff_count + [None]
)
def test_expiry(
self, container, entrypoint_tracker, publish_message, exchange, queue,
limited_backoff, wait_for_backoff_expired
):
""" Message consumption supports backoff expiry
"""
with entrypoint_waiter(
container, 'method', callback=wait_for_backoff_expired
) as result:
publish_message(exchange, "msg", routing_key=queue.routing_key)
with pytest.raises(Backoff.Expired) as raised:
result.get()
assert (
"Backoff aborted after '{}' retries".format(limited_backoff)
) in str(raised.value)
assert entrypoint_tracker.get_results() == (
[None] * limited_backoff + [None]
)
assert entrypoint_tracker.get_exceptions() == (
[(Backoff, ANY, ANY)] * limited_backoff +
[(Backoff.Expired, ANY, ANY)]
)
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_create_resource(self, options, config):
driver = self._get_driver(options, config)
driver._request_backend = mock.MagicMock()
status_code = requests.codes.ok
context = mock.MagicMock()
res_type = 'network'
res_data = {
'id': 'network-123',
'tenant_id': ATTR_NOT_SPECIFIED,
}
driver._request_backend.return_value = (status_code, res_data)
response = driver._create_resource(res_type,
context,
{res_type: res_data})
self.assertEqual(response, res_data)
driver._request_backend.assert_called_with(context, mock.ANY,
res_type, 'CREATE')
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_get_resource(self, options, config):
driver = self._get_driver(options, config)
driver._request_backend = mock.MagicMock()
status_code = requests.codes.ok
context = mock.MagicMock()
res_type = 'network'
res_data = {
'id': 'network-123',
}
fields = ['id']
driver._request_backend.return_value = (status_code, res_data)
response = driver._get_resource(res_type,
context, res_data['id'], fields)
self.assertEqual(response, res_data)
driver._request_backend.assert_called_with(context, mock.ANY,
res_type, 'READ')
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def test_update_resource(self, options, config):
driver = self._get_driver(options, config)
driver._request_backend = mock.MagicMock()
status_code = requests.codes.ok
context = mock.MagicMock()
res_type = 'network'
res_data = {
'id': 'network-123',
}
driver._request_backend.return_value = (status_code, res_data)
response = driver._update_resource(res_type, context, res_data['id'],
{res_type: res_data})
self.assertEqual(response, res_data)
driver._request_backend.assert_called_with(context, mock.ANY,
res_type, 'UPDATE')
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_list_resource(self, options, config):
driver = self._get_driver(options, config)
driver._request_backend = mock.MagicMock()
status_code = requests.codes.ok
context = mock.MagicMock()
res_type = 'network'
res_data = {
'id': 'network-123',
}
fields = ['id']
driver._request_backend.return_value = (status_code, [res_data])
response = driver._list_resource(res_type, context, None, fields)
self.assertEqual(response, [res_data])
driver._request_backend.assert_called_with(context, mock.ANY,
res_type, 'READALL')
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_add_router_interface(self, options, config):
driver = self._get_driver(options, config)
driver._request_backend = mock.MagicMock()
status_code = requests.codes.ok
context = mock.MagicMock()
res_type = 'router'
res_data = {
'id': 'router-123',
}
res_id = res_data['id']
driver._request_backend.return_value = (status_code, res_data)
response = driver.add_router_interface(context, res_id, res_data)
self.assertEqual(response, res_data)
driver._request_backend.assert_called_with(context, mock.ANY,
res_type, 'ADDINTERFACE')
test_drv_opencontrail.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_remove_router_interface(self, options, config):
driver = self._get_driver(options, config)
driver._request_backend = mock.MagicMock()
status_code = requests.codes.ok
context = mock.MagicMock()
res_type = 'router'
res_data = {
'id': 'router-123',
}
res_id = res_data['id']
driver._request_backend.return_value = (status_code, res_data)
response = driver.remove_router_interface(context, res_id, res_data)
self.assertEqual(response, res_data)
driver._request_backend.assert_called_with(context, mock.ANY,
res_type, 'DELINTERFACE')
def test_bad_protocol_version(self, *args):
c = self.make_connection()
c._requests = Mock()
c.defunct = Mock()
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage, version=0x7f)
options = self.make_options_body()
message = self.make_msg(header, options)
c._iobuf = BytesIO()
c._iobuf.write(message)
c.process_io_buffer()
# make sure it errored correctly
c.defunct.assert_called_once_with(ANY)
args, kwargs = c.defunct.call_args
self.assertIsInstance(args[0], ProtocolError)
def test_negative_body_length(self, *args):
c = self.make_connection()
c._requests = Mock()
c.defunct = Mock()
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
message = header + int32_pack(-13)
c._iobuf = BytesIO()
c._iobuf.write(message)
c.process_io_buffer()
# make sure it errored correctly
c.defunct.assert_called_once_with(ANY)
args, kwargs = c.defunct.call_args
self.assertIsInstance(args[0], ProtocolError)
def test_unsupported_cql_version(self, *args):
c = self.make_connection()
c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
c.defunct = Mock()
c.cql_version = "3.0.3"
# read in a SupportedMessage response
header = self.make_header_prefix(SupportedMessage)
options_buf = BytesIO()
write_stringmultimap(options_buf, {
'CQL_VERSION': ['7.8.9'],
'COMPRESSION': []
})
options = options_buf.getvalue()
c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)
# make sure it errored correctly
c.defunct.assert_called_once_with(ANY)
args, kwargs = c.defunct.call_args
self.assertIsInstance(args[0], ProtocolError)
test_control_connection.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_handle_topology_change(self):
event = {
'change_type': 'NEW_NODE',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_topology_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')
event = {
'change_type': 'REMOVED_NODE',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_topology_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.remove_host, None)
event = {
'change_type': 'MOVED_NODE',
'address': ('1.2.3.4', 9000)
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_topology_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')
test_control_connection.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_handle_schema_change(self):
change_types = [getattr(SchemaChangeType, attr) for attr in vars(SchemaChangeType) if attr[0] != '_']
for change_type in change_types:
event = {
'target_type': SchemaTargetType.TABLE,
'change_type': change_type,
'keyspace': 'ks1',
'table': 'table1'
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_schema_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
self.cluster.scheduler.reset_mock()
event['target_type'] = SchemaTargetType.KEYSPACE
del event['table']
self.control_connection._handle_schema_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
test_response_future.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_result_message(self):
session = self.make_basic_session()
session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1', 'ip2']
pool = session._pools.get.return_value
pool.is_shutdown = False
connection = Mock(spec=Connection)
pool.borrow_connection.return_value = (connection, 1)
rf = self.make_response_future(session)
rf.send_request()
rf.session._pools.get.assert_called_once_with('ip1')
pool.borrow_connection.assert_called_once_with(timeout=ANY)
connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[])
rf._set_result(None, None, None, self.make_mock_response([{'col': 'val'}]))
result = rf.result()
self.assertEqual(result, [{'col': 'val'}])
def test_update_host(self, mock_host, mock_get_device_type):
mock_get_device_type.return_value = "network_devices"
record = dict(fake_resources.HOST1.items())
payload = {'name': 'Host_New', 'parent_id': 2}
db_data = payload.copy()
record.update(payload)
mock_host.return_value = record
resp = self.put('/v1/hosts/1', data=payload)
self.assertEqual(resp.json['name'], db_data['name'])
self.assertEqual(resp.json['parent_id'], db_data['parent_id'])
self.assertEqual(200, resp.status_code)
mock_host.assert_called_once_with(mock.ANY, '1', db_data)
mock_get_device_type.assert_called_once()
up_link = {
"rel": "up",
"href": "http://localhost/v1/network-devices/2"
}
self.assertIn(up_link, resp.json["links"])
def test_get_host_by_ip_address_filter(self, fake_hosts):
region_id = 1
ip_address = '10.10.0.1'
filters = {
'region_id': 1, 'ip_address': ip_address,
'resolved-values': True,
}
path_query = '/v1/hosts?region_id={}&ip_address={}'.format(
region_id, ip_address
)
fake_hosts.return_value = (fake_resources.HOSTS_LIST_R2, {})
resp = self.get(path_query)
host_resp = fake_resources.HOSTS_LIST_R2
self.assertEqual(len(resp.json['hosts']), 1)
self.assertEqual(resp.json['hosts'][0]["name"], host_resp[0].name)
fake_hosts.assert_called_once_with(
mock.ANY, filters, {'limit': 30, 'marker': None},
)
def test_projects_put_variables(self, mock_project):
proj1 = fake_resources.PROJECT1
proj1_id = str(proj1.id)
db_return_value = copy.deepcopy(proj1)
db_return_value.variables["a"] = "b"
mock_project.return_value = db_return_value
payload = {"a": "b"}
db_data = payload.copy()
resp = self.put(
'v1/projects/{}/variables'.format(proj1_id),
data=payload
)
self.assertEqual(resp.status_code, 200)
mock_project.assert_called_once_with(mock.ANY, "projects", proj1_id,
db_data)
expected = {
"variables": {"key1": "value1", "key2": "value2", "a": "b"},
}
self.assertDictEqual(expected, resp.json)
def test_put_network_device(self, fake_device, mock_get_device_type):
mock_get_device_type.return_value = "network_devices"
payload = {"name": "NetDev_New1", "parent_id": 2}
fake_device.return_value = dict(fake_resources.NETWORK_DEVICE1.items(),
**payload)
resp = self.put('v1/network-devices/1', data=payload)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json['name'], "NetDev_New1")
self.assertEqual(resp.json['parent_id'], 2)
fake_device.assert_called_once_with(
mock.ANY, '1', {"name": "NetDev_New1", "parent_id": 2}
)
mock_get_device_type.assert_called_once()
up_link = {
"rel": "up",
"href": "http://localhost/v1/network-devices/2"
}
self.assertIn(up_link, resp.json["links"])
def test_get_network_devices_by_ip_address_filter(self, fake_devices):
region_id = '1'
ip_address = '10.10.0.1'
filters = {'region_id': region_id, 'ip_address': ip_address,
'resolved-values': True}
path_query = '/v1/network-devices?region_id={}&ip_address={}'.format(
region_id, ip_address
)
fake_devices.return_value = (fake_resources.NETWORK_DEVICE_LIST1, {})
resp = self.get(path_query)
device_resp = fake_resources.NETWORK_DEVICE_LIST1
self.assertEqual(len(resp.json['network_devices']), 1)
self.assertEqual(resp.json['network_devices'][0]["ip_address"],
device_resp[0].ip_address)
fake_devices.assert_called_once_with(
mock.ANY, filters, {'limit': 30, 'marker': None},
)
def test_get_netinterfaces_by_ip_address_filter(self, fake_interfaces):
device_id = 1
ip_address = '10.10.0.1'
filters = {'device_id': device_id, 'ip_address': ip_address}
path_query = (
'/v1/network-interfaces?device_id={}&ip_address={}'.format(
device_id, ip_address
)
)
fake_interfaces.return_value = (fake_resources.NETWORK_INTERFACE_LIST1,
{})
resp = self.get(path_query)
interface_resp = fake_resources.NETWORK_INTERFACE_LIST1
self.assertEqual(len(resp.json['network_interfaces']), 1)
self.assertEqual(resp.json['network_interfaces'][0]["name"],
interface_resp[0].name)
fake_interfaces.assert_called_once_with(
mock.ANY, filters, {'limit': 30, 'marker': None},
)