def test_load_ok(self):
in_config = json.dumps({'command': '/bin/true',
'config_files': {}})
mo = mock.mock_open(read_data=in_config)
with mock.patch.object(set_configs, 'open', mo):
config = set_configs.load_config()
set_configs.copy_config(config)
self.assertEqual([
mock.call('/var/lib/kolla/config_files/config.json'),
mock.call().__enter__(),
mock.call().read(),
mock.call().__exit__(None, None, None),
mock.call('/run_command', 'w+'),
mock.call().__enter__(),
mock.call().write(u'/bin/true'),
mock.call().__exit__(None, None, None)], mo.mock_calls)
python类call()的实例源码
def test_create_ovs_vif_port(self):
calls = [
mock.call('ovs-vsctl', '--', '--if-exists',
'del-port', 'fake-dev', '--', 'add-port',
'fake-bridge', 'fake-dev',
'--', 'set', 'Interface', 'fake-dev',
'external-ids:iface-id=fake-iface-id',
'external-ids:iface-status=active',
'external-ids:attached-mac=fake-mac',
'external-ids:vm-uuid=fake-instance-uuid',
run_as_root=True)]
with mock.patch.object(utils, 'execute', return_value=('', '')) as ex:
linux_net.create_ovs_vif_port('fake-bridge', 'fake-dev',
'fake-iface-id', 'fake-mac',
'fake-instance-uuid')
ex.assert_has_calls(calls)
def test_request(self):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
loadbalancer = mock.sentinel.loadbalancer
obj = mock.sentinel.obj
create = mock.sentinel.create
find = mock.sentinel.find
expected_result = mock.sentinel.expected_result
timer = [mock.sentinel.t0, mock.sentinel.t1]
m_driver._provisioning_timer.return_value = timer
m_driver._ensure.side_effect = [n_exc.StateInvalidClient,
expected_result]
ret = cls._ensure_provisioned(m_driver, loadbalancer, obj, create,
find)
m_driver._wait_for_provisioning.assert_has_calls(
[mock.call(loadbalancer, t) for t in timer])
m_driver._ensure.assert_has_calls(
[mock.call(obj, create, find) for _ in timer])
self.assertEqual(expected_result, ret)
def test_ensure_not_ready(self):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
loadbalancer = mock.sentinel.loadbalancer
obj = mock.sentinel.obj
create = mock.sentinel.create
find = mock.sentinel.find
timer = [mock.sentinel.t0, mock.sentinel.t1]
m_driver._provisioning_timer.return_value = timer
m_driver._ensure.return_value = None
self.assertRaises(k_exc.ResourceNotReady, cls._ensure_provisioned,
m_driver,
loadbalancer, obj, create, find)
m_driver._wait_for_provisioning.assert_has_calls(
[mock.call(loadbalancer, t) for t in timer])
m_driver._ensure.assert_has_calls(
[mock.call(obj, create, find) for _ in timer])
def test_register(self, m_init, m_wrap_consumer):
consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
mock.sentinel.key_fn2: mock.sentinel.key2,
mock.sentinel.key_fn3: mock.sentinel.key3}
m_dispatcher = mock.Mock()
m_consumer = mock.Mock()
m_consumer.consumes = consumes
m_wrap_consumer.return_value = mock.sentinel.handler
m_init.return_value = None
pipeline = _TestEventPipeline()
pipeline._dispatcher = m_dispatcher
pipeline.register(m_consumer)
m_wrap_consumer.assert_called_once_with(m_consumer)
m_dispatcher.register.assert_has_calls([
mock.call(key_fn, key, mock.sentinel.handler)
for key_fn, key in consumes.items()], any_order=True)
def test_call_retry(self, m_sleep, m_count):
attempts = 3
timeout = 10
deadline = self.now + timeout
failures = [_EX1()] * (attempts - 1)
event = mock.sentinel.event
m_handler = mock.Mock()
m_handler.side_effect = failures + [None]
m_sleep.return_value = 1
m_count.return_value = list(range(1, 5))
retry = h_retry.Retry(m_handler, timeout=timeout, exceptions=_EX1)
retry(event)
m_handler.assert_has_calls([mock.call(event)] * attempts)
m_sleep.assert_has_calls([
mock.call(deadline, i + 1, failures[i])
for i in range(len(failures))])
def test_send_magic_packets(self, mock_socket):
fake_socket = mock.Mock(spec=socket, spec_set=True)
mock_socket.return_value = fake_socket()
obj_utils.create_test_port(self.context,
uuid=uuidutils.generate_uuid(),
address='aa:bb:cc:dd:ee:ff',
node_id=self.node.id)
with task_manager.acquire(
self.context, self.node.uuid, shared=True) as task:
wol_power._send_magic_packets(task, '255.255.255.255', 9)
expected_calls = [
mock.call(),
mock.call().setsockopt(socket.SOL_SOCKET,
socket.SO_BROADCAST, 1),
mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
mock.call().close()]
fake_socket.assert_has_calls(expected_calls)
self.assertEqual(1, mock_socket.call_count)
def test_get_swift_hash_env(self, mock_config, mock_service_name):
mock_config.return_value = None
mock_service_name.return_value = "testsvc"
tmpfile = tempfile.mktemp()
swift_context.SWIFT_HASH_FILE = tmpfile
with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
mock_env_get.return_value = str(uuid.uuid4())
hash_ = swift_context.get_swift_hash()
mock_env_get.assert_has_calls([
mock.call('JUJU_MODEL_UUID'),
mock.call('JUJU_ENV_UUID',
mock_env_get.return_value)
])
with open(tmpfile, 'r') as fd:
self.assertEqual(hash_, fd.read())
self.assertTrue(mock_config.called)
def test_failure(self):
"""Ensure that action_fail is called on failure."""
self.config.return_value = "swauth"
self.action_get.return_value = "test"
self.determine_api_port.return_value = 8070
self.CalledProcessError = ValueError
self.check_call.side_effect = subprocess.CalledProcessError(
0, "hi", "no")
actions.add_user.add_user()
self.leader_get.assert_called_with("swauth-admin-key")
calls = [call("account"), call("username"), call("password")]
self.action_get.assert_has_calls(calls)
self.action_set.assert_not_called()
self.action_fail.assert_called_once_with(
'Adding user test failed with: "Command \'hi\' returned non-zero '
'exit status 0"')
def test_final_action(
self,
refresh_batch,
temp_name,
write_session,
mock_execute,
database_name
):
refresh_batch.final_action()
calls = [
mock.call(write_session, 'USE {0}'.format(database_name)),
mock.call(
write_session,
'DROP TABLE IF EXISTS {0}'.format(temp_name),
),
]
mock_execute.assert_has_calls(calls)
def test_create_table_from_src_table(
self,
refresh_batch,
fake_original_table,
fake_new_table,
show_table_query,
write_session
):
with mock.patch.object(
refresh_batch,
'_execute_query',
autospec=True
) as mock_execute:
mock_execute.return_value.fetchone.return_value = [
'test_db',
fake_original_table
]
refresh_batch.create_table_from_src_table(write_session)
calls = [
mock.call(write_session, show_table_query),
mock.call(write_session, fake_new_table)
]
mock_execute.assert_has_calls(calls, any_order=True)
def test_bootstrap_files_calls_register_file_for_each_file(
self,
containers
):
file_paths = {'a.test', 'b.test'}
bootstrapper = FileBootstrapperBase(
schema_ref=None,
file_paths=file_paths,
override_metadata=True,
file_extension='test'
)
bootstrapper.register_file = mock.Mock(return_value=None)
bootstrapper.bootstrap_schema_result = mock.Mock()
bootstrapper.bootstrap_files()
assert bootstrapper.register_file.mock_calls == [
mock.call(file_path) for file_path in file_paths
]
assert bootstrapper.bootstrap_schema_result.mock_calls == [
mock.call(None) for _ in file_paths
]
def test_run_converts_existing_if_overwrite_true(
self,
sql_file_path,
avsc_file_path,
globs,
mock_get_file_paths_from_glob_patterns,
mock_os_path_exists,
mock_batch
):
mock_batch.options.overwrite = True
mock_batch.run()
assert mock_get_file_paths_from_glob_patterns.mock_calls == [
mock.call(glob_patterns=globs)
]
assert mock_os_path_exists.mock_calls == [mock.call(avsc_file_path)]
assert mock_batch.convert_sql_to_avsc.mock_calls == [
mock.call(
avsc_file_path=avsc_file_path,
sql_file_path=sql_file_path
)
]
def test_client_manager_setup_with_multiple_bot_tokens(mocked_slack_client):
config = {
'SLACK': {
'BOTS': {
'spam': 'abc-123',
'ham': 'def-456',
}
}
}
client_manager = rtm.SlackRTMClientManager()
client_manager.container = Mock(config=config)
client_manager.setup()
assert 'spam' in client_manager.clients
assert 'ham' in client_manager.clients
assert client_manager.clients['spam'] == mocked_slack_client.return_value
assert client_manager.clients['ham'] == mocked_slack_client.return_value
assert call('abc-123') in mocked_slack_client.call_args_list
assert call('def-456') in mocked_slack_client.call_args_list
def test_handle_event_by_type(self, events, service_runner, tracker):
class Service:
name = 'sample'
@rtm.handle_event('presence_change')
def handle_event(self, event):
tracker.handle_event(event)
service_runner(Service, events)
assert (
tracker.handle_event.call_args_list ==
[
call(event) for event in events
if event.get('type') == 'presence_change'
])
def test_handle_any_message(self, events, service_runner, tracker):
class Service:
name = 'sample'
@rtm.handle_message
def handle_event(self, event, message):
tracker.handle_event(event, message)
service_runner(Service, events)
assert (
tracker.handle_event.call_args_list ==
[
call(event, event.get('text')) for event in events
if event.get('type') == 'message'
])
def test_replies_on_handle_message(events, service_runner):
class Service:
name = 'sample'
@rtm.handle_message
def handle_message(self, event, message):
return 'sure, {}'.format(message)
reply_calls = service_runner(Service, events)
assert reply_calls == [
call('D11', 'sure, spam ham'),
call('D11', 'sure, ham spam'),
call('D11', 'sure, spam egg'),
]
def test_make_without_build_container_tag_with_context(self, skipper_runner_run_mock):
global_params = self.global_params[:-2]
makefile = 'Makefile'
target = 'all'
make_params = ['-f', makefile, target]
self._invoke_cli(
defaults=config.load_defaults(),
global_params=global_params,
subcmd='make',
subcmd_params=make_params
)
expected_commands = [
mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image',
SKIPPER_CONF_CONTAINER_CONTEXT]),
mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[],
interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"]
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 1]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
result = self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
self.assertEqual(result.exit_code, 1)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0, 1]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
result = self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
self.assertEqual(result.exit_code, 0)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_to_namespace(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['--namespace', 'my_namespace', 'my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_namespace/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_namespace/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_namespace/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
defaults=config.load_defaults(),
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_make_without_build_container_tag(self, skipper_runner_run_mock):
global_params = self.global_params[:-2]
makefile = 'Makefile'
target = 'all'
make_params = ['-f', makefile, target]
self._invoke_cli(
global_params=global_params,
subcmd='make',
subcmd_params=make_params
)
expected_commands = [
mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image', '.']),
mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[],
interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_resume_compute_exception_wait_slave_available(self, mock_sleep):
side_effect_xenapi_failure = FakeXenAPIException
side_effect_plugin_error = [self.pluginlib.PluginError(
"Wait for the slave to become available"),
None]
self.mock_patch_object(self.session.xenapi.VM,
'start')
self.session.xenapi.VM.start.side_effect = \
side_effect_xenapi_failure
self.host._run_command.side_effect = side_effect_plugin_error
self.host.XenAPI.Failure = FakeXenAPIException
expected = [call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid']),
call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid'])]
self.host._resume_compute(self.session,
'fake_compute_ref',
'fake_compute_uuid')
self.session.xenapi.VM.start.assert_called_with(
'fake_compute_ref', False, True)
self.assertEqual(expected, self.host._run_command.call_args_list)
mock_sleep.assert_called_once()
def test_power_action_input_cmd_result_not_empty(self):
side_effects = [None, None, 'not_empty']
temp_arg_dict = {'host_uuid': 'fake_host_uuid'}
self.host._run_command.side_effect = side_effects
cmds = {"reboot": "host-reboot",
"startup": "host-power-on",
"shutdown": "host-shutdown"}
fake_action = 'reboot' # 'statup' and 'shutdown' should be same
expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s"
% 'fake_host_uuid']),
call(["xe", "vm-shutdown", "--multiple",
"resident-on=%s" % 'fake_host_uuid']),
call(["xe", cmds[fake_action], "uuid=%s" %
'fake_host_uuid'])]
self.assertRaises(self.pluginlib.PluginError,
self.host._power_action,
fake_action, temp_arg_dict)
self.assertEqual(self.host._run_command.call_args_list,
expected_cmd_arg_list)
def test_power_action(self):
temp_arg_dict = {'host_uuid': 'fake_host_uuid'}
self.host._run_command.return_value = None
cmds = {"reboot": "host-reboot",
"startup": "host-power-on",
"shutdown": "host-shutdown"}
fake_action = 'reboot' # 'statup' and 'shutdown' should be same
expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s"
% 'fake_host_uuid']),
call(["xe", "vm-shutdown", "--multiple",
"resident-on=%s" % 'fake_host_uuid']),
call(["xe", cmds[fake_action], "uuid=%s" %
'fake_host_uuid'])]
expected_result = {"power_action": fake_action}
action_result = self.host._power_action(fake_action, temp_arg_dict)
self.assertEqual(self.host._run_command.call_args_list,
expected_cmd_arg_list)
self.assertEqual(action_result, expected_result)
def test_ovs_add_patch_port(self):
brige_name = 'fake_brige_name'
port_name = 'fake_port_name'
peer_port_name = 'fake_peer_port_name'
side_effects = [brige_name, port_name, peer_port_name]
self.mock_patch_object(self.pluginlib,
'exists')
self.pluginlib.exists.side_effect = side_effects
expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port',
port_name, '--', 'add-port', brige_name,
'fake_port_name', '--', 'set', 'interface',
'fake_port_name', 'type=patch',
'options:peer=%s' % peer_port_name]
expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'),
call('fake_args', 'port_name'),
call('fake_args', 'peer_port_name')]
self.host._ovs_add_patch_port('fake_args')
self.host._run_command.assert_called_with(expected_cmd_args)
self.assertEqual(self.pluginlib.exists.call_args_list,
expected_pluginlib_arg_list)
def test_ovs_del_port(self):
bridge_name = 'fake_brige_name'
port_name = 'fake_port_name'
side_effects = [bridge_name, port_name]
self.mock_patch_object(self.pluginlib,
'exists')
self.pluginlib.exists.side_effect = side_effects
expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port',
bridge_name, port_name]
expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'),
call('fake_args', 'port_name')]
self.host._ovs_del_port('fake_args')
self.host._run_command.assert_called_with(expected_cmd_args)
self.assertEqual(self.pluginlib.exists.call_args_list,
expected_pluginlib_arg_list)