python类call()的实例源码

test_set_config.py 文件源码 项目:kolla-kubernetes-personal 作者: rthallisey 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_load_ok(self):
        in_config = json.dumps({'command': '/bin/true',
                                'config_files': {}})

        mo = mock.mock_open(read_data=in_config)
        with mock.patch.object(set_configs, 'open', mo):
            config = set_configs.load_config()
            set_configs.copy_config(config)
            self.assertEqual([
                mock.call('/var/lib/kolla/config_files/config.json'),
                mock.call().__enter__(),
                mock.call().read(),
                mock.call().__exit__(None, None, None),
                mock.call('/run_command', 'w+'),
                mock.call().__enter__(),
                mock.call().write(u'/bin/true'),
                mock.call().__exit__(None, None, None)], mo.mock_calls)
test_linux_net_utils.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_ovs_vif_port(self):
        calls = [
            mock.call('ovs-vsctl', '--', '--if-exists',
                      'del-port', 'fake-dev', '--', 'add-port',
                      'fake-bridge', 'fake-dev',
                      '--', 'set', 'Interface', 'fake-dev',
                      'external-ids:iface-id=fake-iface-id',
                      'external-ids:iface-status=active',
                      'external-ids:attached-mac=fake-mac',
                      'external-ids:vm-uuid=fake-instance-uuid',
                      run_as_root=True)]
        with mock.patch.object(utils, 'execute', return_value=('', '')) as ex:
            linux_net.create_ovs_vif_port('fake-bridge', 'fake-dev',
                                          'fake-iface-id', 'fake-mac',
                                          'fake-instance-uuid')
            ex.assert_has_calls(calls)
test_lbaasv2.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_request(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        loadbalancer = mock.sentinel.loadbalancer
        obj = mock.sentinel.obj
        create = mock.sentinel.create
        find = mock.sentinel.find
        expected_result = mock.sentinel.expected_result
        timer = [mock.sentinel.t0, mock.sentinel.t1]
        m_driver._provisioning_timer.return_value = timer
        m_driver._ensure.side_effect = [n_exc.StateInvalidClient,
                                        expected_result]

        ret = cls._ensure_provisioned(m_driver, loadbalancer, obj, create,
                                      find)

        m_driver._wait_for_provisioning.assert_has_calls(
            [mock.call(loadbalancer, t) for t in timer])
        m_driver._ensure.assert_has_calls(
            [mock.call(obj, create, find) for _ in timer])
        self.assertEqual(expected_result, ret)
test_lbaasv2.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ensure_not_ready(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        loadbalancer = mock.sentinel.loadbalancer
        obj = mock.sentinel.obj
        create = mock.sentinel.create
        find = mock.sentinel.find
        timer = [mock.sentinel.t0, mock.sentinel.t1]
        m_driver._provisioning_timer.return_value = timer
        m_driver._ensure.return_value = None

        self.assertRaises(k_exc.ResourceNotReady, cls._ensure_provisioned,
                          m_driver,
                          loadbalancer, obj, create, find)

        m_driver._wait_for_provisioning.assert_has_calls(
            [mock.call(loadbalancer, t) for t in timer])
        m_driver._ensure.assert_has_calls(
            [mock.call(obj, create, find) for _ in timer])
test_dispatch.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_register(self, m_init, m_wrap_consumer):
        consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
                    mock.sentinel.key_fn2: mock.sentinel.key2,
                    mock.sentinel.key_fn3: mock.sentinel.key3}
        m_dispatcher = mock.Mock()
        m_consumer = mock.Mock()
        m_consumer.consumes = consumes
        m_wrap_consumer.return_value = mock.sentinel.handler
        m_init.return_value = None
        pipeline = _TestEventPipeline()
        pipeline._dispatcher = m_dispatcher

        pipeline.register(m_consumer)

        m_wrap_consumer.assert_called_once_with(m_consumer)
        m_dispatcher.register.assert_has_calls([
            mock.call(key_fn, key, mock.sentinel.handler)
            for key_fn, key in consumes.items()], any_order=True)
test_retry.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_call_retry(self, m_sleep, m_count):
        attempts = 3
        timeout = 10
        deadline = self.now + timeout
        failures = [_EX1()] * (attempts - 1)
        event = mock.sentinel.event
        m_handler = mock.Mock()
        m_handler.side_effect = failures + [None]
        m_sleep.return_value = 1
        m_count.return_value = list(range(1, 5))
        retry = h_retry.Retry(m_handler, timeout=timeout, exceptions=_EX1)

        retry(event)

        m_handler.assert_has_calls([mock.call(event)] * attempts)
        m_sleep.assert_has_calls([
            mock.call(deadline, i + 1, failures[i])
            for i in range(len(failures))])
test_power.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
test_swift_context.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_swift_hash_env(self, mock_config, mock_service_name):
        mock_config.return_value = None
        mock_service_name.return_value = "testsvc"
        tmpfile = tempfile.mktemp()
        swift_context.SWIFT_HASH_FILE = tmpfile
        with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
            mock_env_get.return_value = str(uuid.uuid4())
            hash_ = swift_context.get_swift_hash()
            mock_env_get.assert_has_calls([
                mock.call('JUJU_MODEL_UUID'),
                mock.call('JUJU_ENV_UUID',
                          mock_env_get.return_value)
            ])

        with open(tmpfile, 'r') as fd:
            self.assertEqual(hash_, fd.read())

        self.assertTrue(mock_config.called)
test_actions.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_failure(self):
        """Ensure that action_fail is called on failure."""
        self.config.return_value = "swauth"
        self.action_get.return_value = "test"
        self.determine_api_port.return_value = 8070
        self.CalledProcessError = ValueError

        self.check_call.side_effect = subprocess.CalledProcessError(
            0, "hi", "no")
        actions.add_user.add_user()
        self.leader_get.assert_called_with("swauth-admin-key")
        calls = [call("account"), call("username"), call("password")]
        self.action_get.assert_has_calls(calls)
        self.action_set.assert_not_called()

        self.action_fail.assert_called_once_with(
            'Adding user test failed with: "Command \'hi\' returned non-zero '
            'exit status 0"')
copy_table_to_blackhole_table_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_final_action(
        self,
        refresh_batch,
        temp_name,
        write_session,
        mock_execute,
        database_name
    ):
        refresh_batch.final_action()
        calls = [
            mock.call(write_session, 'USE {0}'.format(database_name)),
            mock.call(
                write_session,
                'DROP TABLE IF EXISTS {0}'.format(temp_name),
            ),
        ]
        mock_execute.assert_has_calls(calls)
copy_table_to_blackhole_table_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_create_table_from_src_table(
        self,
        refresh_batch,
        fake_original_table,
        fake_new_table,
        show_table_query,
        write_session
    ):
        with mock.patch.object(
            refresh_batch,
            '_execute_query',
            autospec=True
        ) as mock_execute:
            mock_execute.return_value.fetchone.return_value = [
                'test_db',
                fake_original_table
            ]
            refresh_batch.create_table_from_src_table(write_session)
            calls = [
                mock.call(write_session, show_table_query),
                mock.call(write_session, fake_new_table)
            ]
            mock_execute.assert_has_calls(calls, any_order=True)
bootstrapper_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_bootstrap_files_calls_register_file_for_each_file(
            self,
            containers
    ):
        file_paths = {'a.test', 'b.test'}
        bootstrapper = FileBootstrapperBase(
            schema_ref=None,
            file_paths=file_paths,
            override_metadata=True,
            file_extension='test'
        )
        bootstrapper.register_file = mock.Mock(return_value=None)
        bootstrapper.bootstrap_schema_result = mock.Mock()
        bootstrapper.bootstrap_files()
        assert bootstrapper.register_file.mock_calls == [
            mock.call(file_path) for file_path in file_paths
        ]
        assert bootstrapper.bootstrap_schema_result.mock_calls == [
            mock.call(None) for _ in file_paths
        ]
redshift_sql_to_avsc_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_run_converts_existing_if_overwrite_true(
            self,
            sql_file_path,
            avsc_file_path,
            globs,
            mock_get_file_paths_from_glob_patterns,
            mock_os_path_exists,
            mock_batch
    ):
        mock_batch.options.overwrite = True
        mock_batch.run()
        assert mock_get_file_paths_from_glob_patterns.mock_calls == [
            mock.call(glob_patterns=globs)
        ]
        assert mock_os_path_exists.mock_calls == [mock.call(avsc_file_path)]
        assert mock_batch.convert_sql_to_avsc.mock_calls == [
            mock.call(
                avsc_file_path=avsc_file_path,
                sql_file_path=sql_file_path
            )
        ]
test_rtm.py 文件源码 项目:nameko-slack 作者: iky 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_client_manager_setup_with_multiple_bot_tokens(mocked_slack_client):

    config = {
        'SLACK': {
            'BOTS': {
                'spam': 'abc-123',
                'ham': 'def-456',
            }
        }
    }

    client_manager = rtm.SlackRTMClientManager()
    client_manager.container = Mock(config=config)

    client_manager.setup()

    assert 'spam' in client_manager.clients
    assert 'ham' in client_manager.clients
    assert client_manager.clients['spam'] == mocked_slack_client.return_value
    assert client_manager.clients['ham'] == mocked_slack_client.return_value

    assert call('abc-123') in mocked_slack_client.call_args_list
    assert call('def-456') in mocked_slack_client.call_args_list
test_rtm.py 文件源码 项目:nameko-slack 作者: iky 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_handle_event_by_type(self, events, service_runner, tracker):

        class Service:

            name = 'sample'

            @rtm.handle_event('presence_change')
            def handle_event(self, event):
                tracker.handle_event(event)

        service_runner(Service, events)

        assert (
            tracker.handle_event.call_args_list ==
            [
                call(event) for event in events
                if event.get('type') == 'presence_change'
            ])
test_rtm.py 文件源码 项目:nameko-slack 作者: iky 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_handle_any_message(self, events, service_runner, tracker):

        class Service:

            name = 'sample'

            @rtm.handle_message
            def handle_event(self, event, message):
                tracker.handle_event(event, message)

        service_runner(Service, events)

        assert (
            tracker.handle_event.call_args_list ==
            [
                call(event, event.get('text')) for event in events
                if event.get('type') == 'message'
            ])
test_rtm.py 文件源码 项目:nameko-slack 作者: iky 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_replies_on_handle_message(events, service_runner):

        class Service:

            name = 'sample'

            @rtm.handle_message
            def handle_message(self, event, message):
                return 'sure, {}'.format(message)

        reply_calls = service_runner(Service, events)

        assert reply_calls == [
            call('D11', 'sure, spam ham'),
            call('D11', 'sure, ham spam'),
            call('D11', 'sure, spam egg'),
        ]
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_make_without_build_container_tag_with_context(self, skipper_runner_run_mock):
        global_params = self.global_params[:-2]
        makefile = 'Makefile'
        target = 'all'
        make_params = ['-f', makefile, target]
        self._invoke_cli(
            defaults=config.load_defaults(),
            global_params=global_params,
            subcmd='make',
            subcmd_params=make_params
        )
        expected_commands = [
            mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image',
                       SKIPPER_CONF_CONTAINER_CONTEXT]),
            mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[],
                      interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_push(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"]
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 1)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 0)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_push_to_namespace(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['--namespace', 'my_namespace', 'my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_namespace/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_namespace/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_namespace/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            defaults=config.load_defaults(),
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_make_without_build_container_tag(self, skipper_runner_run_mock):
        global_params = self.global_params[:-2]
        makefile = 'Makefile'
        target = 'all'
        make_params = ['-f', makefile, target]
        self._invoke_cli(
            global_params=global_params,
            subcmd='make',
            subcmd_params=make_params
        )
        expected_commands = [
            mock.call(['docker', 'build', '-t', 'build-container-image', '-f', 'Dockerfile.build-container-image', '.']),
            mock.call(['make'] + make_params, fqdn_image='build-container-image', environment=[],
                      interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
test_xenhost.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_resume_compute_exception_wait_slave_available(self, mock_sleep):
        side_effect_xenapi_failure = FakeXenAPIException
        side_effect_plugin_error = [self.pluginlib.PluginError(
                                    "Wait for the slave to become available"),
                                    None]
        self.mock_patch_object(self.session.xenapi.VM,
                               'start')
        self.session.xenapi.VM.start.side_effect = \
            side_effect_xenapi_failure
        self.host._run_command.side_effect = side_effect_plugin_error
        self.host.XenAPI.Failure = FakeXenAPIException
        expected = [call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid']),
                    call(["xe", "vm-start", "uuid=%s" % 'fake_compute_uuid'])]

        self.host._resume_compute(self.session,
                                  'fake_compute_ref',
                                  'fake_compute_uuid')
        self.session.xenapi.VM.start.assert_called_with(
            'fake_compute_ref', False, True)
        self.assertEqual(expected, self.host._run_command.call_args_list)
        mock_sleep.assert_called_once()
test_xenhost.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_power_action_input_cmd_result_not_empty(self):
        side_effects = [None, None, 'not_empty']
        temp_arg_dict = {'host_uuid': 'fake_host_uuid'}
        self.host._run_command.side_effect = side_effects
        cmds = {"reboot": "host-reboot",
                "startup": "host-power-on",
                "shutdown": "host-shutdown"}
        fake_action = 'reboot'  # 'statup' and 'shutdown' should be same
        expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s"
                                      % 'fake_host_uuid']),
                                 call(["xe", "vm-shutdown", "--multiple",
                                      "resident-on=%s" % 'fake_host_uuid']),
                                 call(["xe", cmds[fake_action], "uuid=%s" %
                                      'fake_host_uuid'])]

        self.assertRaises(self.pluginlib.PluginError,
                          self.host._power_action,
                          fake_action, temp_arg_dict)
        self.assertEqual(self.host._run_command.call_args_list,
                         expected_cmd_arg_list)
test_xenhost.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_power_action(self):
        temp_arg_dict = {'host_uuid': 'fake_host_uuid'}
        self.host._run_command.return_value = None
        cmds = {"reboot": "host-reboot",
                "startup": "host-power-on",
                "shutdown": "host-shutdown"}
        fake_action = 'reboot'  # 'statup' and 'shutdown' should be same
        expected_cmd_arg_list = [call(["xe", "host-disable", "uuid=%s"
                                      % 'fake_host_uuid']),
                                 call(["xe", "vm-shutdown", "--multiple",
                                      "resident-on=%s" % 'fake_host_uuid']),
                                 call(["xe", cmds[fake_action], "uuid=%s" %
                                      'fake_host_uuid'])]
        expected_result = {"power_action": fake_action}

        action_result = self.host._power_action(fake_action, temp_arg_dict)
        self.assertEqual(self.host._run_command.call_args_list,
                         expected_cmd_arg_list)
        self.assertEqual(action_result, expected_result)
test_xenhost.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_ovs_add_patch_port(self):
        brige_name = 'fake_brige_name'
        port_name = 'fake_port_name'
        peer_port_name = 'fake_peer_port_name'
        side_effects = [brige_name, port_name, peer_port_name]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port',
                             port_name, '--', 'add-port', brige_name,
                             'fake_port_name', '--', 'set', 'interface',
                             'fake_port_name', 'type=patch',
                             'options:peer=%s' % peer_port_name]
        expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'),
                                       call('fake_args', 'port_name'),
                                       call('fake_args', 'peer_port_name')]

        self.host._ovs_add_patch_port('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)
test_xenhost.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_ovs_del_port(self):
        bridge_name = 'fake_brige_name'
        port_name = 'fake_port_name'
        side_effects = [bridge_name, port_name]
        self.mock_patch_object(self.pluginlib,
                               'exists')
        self.pluginlib.exists.side_effect = side_effects
        expected_cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port',
                             bridge_name, port_name]
        expected_pluginlib_arg_list = [call('fake_args', 'bridge_name'),
                                       call('fake_args', 'port_name')]

        self.host._ovs_del_port('fake_args')
        self.host._run_command.assert_called_with(expected_cmd_args)
        self.assertEqual(self.pluginlib.exists.call_args_list,
                         expected_pluginlib_arg_list)


问题


面经


文章

微信
公众号

扫码关注公众号