python类ANY的实例源码

test_lbaasv2.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_ensure_loadbalancer(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        expected_resp = mock.sentinel.expected_resp
        namespace = 'TEST_NAMESPACE'
        name = 'TEST_NAME'
        project_id = 'TEST_PROJECT'
        subnet_id = 'D3FA400A-F543-4B91-9CD3-047AF0CE42D1'
        ip = '1.2.3.4'
        # TODO(ivc): handle security groups
        sg_ids = []
        endpoints = {'metadata': {'namespace': namespace, 'name': name}}

        m_driver._ensure.return_value = expected_resp
        resp = cls.ensure_loadbalancer(m_driver, endpoints, project_id,
                                       subnet_id, ip, sg_ids)
        m_driver._ensure.assert_called_once_with(mock.ANY,
                                                 m_driver._create_loadbalancer,
                                                 m_driver._find_loadbalancer)
        req = m_driver._ensure.call_args[0][0]
        self.assertEqual("%s/%s" % (namespace, name), req.name)
        self.assertEqual(project_id, req.project_id)
        self.assertEqual(subnet_id, req.subnet_id)
        self.assertEqual(ip, str(req.ip))
        self.assertEqual(expected_resp, resp)
test_lbaasv2.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ensure_pool(self):
        cls = d_lbaasv2.LBaaSv2Driver
        m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
        expected_resp = mock.sentinel.expected_resp
        endpoints = mock.sentinel.endpoints
        loadbalancer = obj_lbaas.LBaaSLoadBalancer(
            id='00EE9E11-91C2-41CF-8FD4-7970579E5C4C',
            project_id='TEST_PROJECT')
        listener = obj_lbaas.LBaaSListener(
            id='A57B7771-6050-4CA8-A63C-443493EC98AB',
            name='TEST_LISTENER_NAME',
            protocol='TCP')
        m_driver._ensure_provisioned.return_value = expected_resp

        resp = cls.ensure_pool(m_driver, endpoints, loadbalancer, listener)

        m_driver._ensure_provisioned.assert_called_once_with(
            loadbalancer, mock.ANY, m_driver._create_pool,
            m_driver._find_pool)
        pool = m_driver._ensure_provisioned.call_args[0][1]
        self.assertEqual(listener.name, pool.name)
        self.assertEqual(loadbalancer.project_id, pool.project_id)
        self.assertEqual(listener.id, pool.listener_id)
        self.assertEqual(listener.protocol, pool.protocol)
        self.assertEqual(expected_resp, resp)
test_k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_annotate(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        ret = {'metadata': {'annotations': annotations,
                            "resourceVersion": resource_version}}
        data = jsonutils.dumps(ret, sort_keys=True)

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.json.return_value = ret
        m_patch.return_value = m_resp

        self.assertEqual(annotations, self.client.annotate(
            path, annotations, resource_version=resource_version))
        m_patch.assert_called_once_with(self.base_url + path,
                                        data=data, headers=mock.ANY,
                                        cert=(None, None), verify=False)
test_deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_execute_clean_step_no_success_log(
            self, log_mock, run_mock, utils_mock, parse_driver_info_mock):

        run_mock.side_effect = exception.InstanceDeployFailure('Boom')
        step = {'priority': 10, 'interface': 'deploy',
                'step': 'erase_devices', 'args': {'tags': ['clean']}}
        di_info = self.node.driver_internal_info
        di_info['agent_url'] = 'http://127.0.0.1'
        self.node.driver_internal_info = di_info
        self.node.save()
        with task_manager.acquire(self.context, self.node.uuid) as task:
            self.driver.execute_clean_step(task, step)
            log_mock.error.assert_called_once_with(
                mock.ANY, {'node': task.node['uuid'],
                           'step': 'erase_devices'})
            utils_mock.assert_called_once_with(task, 'Boom')
            self.assertFalse(log_mock.info.called)
test_power.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
test_management.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test__set_boot_device_order_fail(self, mock_aw, mock_client_pywsman):
        namespace = resource_uris.CIM_BootConfigSetting
        device = boot_devices.PXE
        result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}],
                                               namespace)
        mock_xml = test_utils.mock_wsman_root(result_xml)
        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = mock_xml

        self.assertRaises(exception.AMTFailure,
                          amt_mgmt._set_boot_device_order, self.node, device)
        mock_pywsman.invoke.assert_called_once_with(
            mock.ANY, namespace, 'ChangeBootOrder', mock.ANY)

        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = None

        self.assertRaises(exception.AMTConnectFailure,
                          amt_mgmt._set_boot_device_order, self.node, device)
        self.assertTrue(mock_aw.called)
test_management.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test__enable_boot_config_fail(self, mock_aw, mock_client_pywsman):
        namespace = resource_uris.CIM_BootService
        result_xml = test_utils.build_soap_xml([{'ReturnValue': '2'}],
                                               namespace)
        mock_xml = test_utils.mock_wsman_root(result_xml)
        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = mock_xml

        self.assertRaises(exception.AMTFailure,
                          amt_mgmt._enable_boot_config, self.node)
        mock_pywsman.invoke.assert_called_once_with(
            mock.ANY, namespace, 'SetBootConfigRole', mock.ANY)

        mock_pywsman = mock_client_pywsman.Client.return_value
        mock_pywsman.invoke.return_value = None

        self.assertRaises(exception.AMTConnectFailure,
                          amt_mgmt._enable_boot_config, self.node)
        self.assertTrue(mock_aw.called)
test_vendor.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test__get_nm_address_raw_fail(self, parse_mock, dump_mock, raw_mock,
                                      unlink_mock):
        parse_mock.return_value = ('0x0A', '0x0B')
        raw_mock.side_effect = exception.IPMIFailure('raw error')
        with task_manager.acquire(self.context, self.node.uuid,
                                  shared=False) as task:
            self.assertRaises(exception.IPMIFailure, nm_vendor._get_nm_address,
                              task)
            self.node.refresh()
            internal_info = self.node.driver_internal_info
            self.assertEqual(False, internal_info['intel_nm_address'])
            self.assertEqual(False, internal_info['intel_nm_channel'])
            parse_mock.assert_called_once_with(self.temp_filename)
            dump_mock.assert_called_once_with(task, self.temp_filename)
            unlink_mock.assert_called_once_with(self.temp_filename)
            raw_mock.assert_called_once_with(task, mock.ANY)
test_rpc.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_rpc(
        self, container, entrypoint_tracker, rpc_proxy, wait_for_result,
        backoff_count
    ):
        """ RPC entrypoint supports backoff
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_result
        ) as result:
            res = rpc_proxy.service.method("arg")

        assert res == result.get() == "result"

        # entrypoint fired backoff_count + 1 times
        assert entrypoint_tracker.get_results() == (
            [None] * backoff_count + ["result"]
        )
        # entrypoint raised `Backoff` for all but the last execution
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * backoff_count + [None]
        )
test_events.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_events(
        self, container, entrypoint_tracker, dispatch_event, wait_for_result,
        backoff_count
    ):
        """ Event handler supports backoff
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_result
        ) as result:
            dispatch_event("src_service", "event_type", {})

        assert result.get() == "result"

        assert entrypoint_tracker.get_results() == (
            [None] * backoff_count + ["result"]
        )
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * backoff_count + [None]
        )
test_messaging.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_messaging(
        self, container, entrypoint_tracker, publish_message, exchange, queue,
        wait_for_result, backoff_count
    ):
        """ Message consumption supports backoff
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_result
        ) as result:
            publish_message(exchange, "msg", routing_key=queue.routing_key)

        assert result.get() == "result"

        assert entrypoint_tracker.get_results() == (
            [None] * backoff_count + ["result"]
        )
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * backoff_count + [None]
        )
test_messaging.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_expiry(
        self, container, entrypoint_tracker, publish_message, exchange, queue,
        limited_backoff, wait_for_backoff_expired
    ):
        """ Message consumption supports backoff expiry
        """
        with entrypoint_waiter(
            container, 'method', callback=wait_for_backoff_expired
        ) as result:
            publish_message(exchange, "msg", routing_key=queue.routing_key)

        with pytest.raises(Backoff.Expired) as raised:
            result.get()
        assert (
            "Backoff aborted after '{}' retries".format(limited_backoff)
        ) in str(raised.value)

        assert entrypoint_tracker.get_results() == (
            [None] * limited_backoff + [None]
        )
        assert entrypoint_tracker.get_exceptions() == (
            [(Backoff, ANY, ANY)] * limited_backoff +
            [(Backoff.Expired, ANY, ANY)]
        )
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_create_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
            'tenant_id': ATTR_NOT_SPECIFIED,
        }
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._create_resource(res_type,
                                           context,
                                           {res_type: res_data})

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'CREATE')
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        fields = ['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._get_resource(res_type,
                                        context, res_data['id'], fields)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'READ')
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_update_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._update_resource(res_type, context, res_data['id'],
                                           {res_type: res_data})

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'UPDATE')
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_list_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        fields = ['id']
        driver._request_backend.return_value = (status_code, [res_data])

        response = driver._list_resource(res_type, context, None, fields)

        self.assertEqual(response, [res_data])
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'READALL')
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_add_router_interface(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'router'
        res_data = {
            'id': 'router-123',
        }
        res_id = res_data['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver.add_router_interface(context, res_id, res_data)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'ADDINTERFACE')
test_drv_opencontrail.py 文件源码 项目:networking-opencontrail 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_remove_router_interface(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'router'
        res_data = {
            'id': 'router-123',
        }
        res_id = res_data['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver.remove_router_interface(context, res_id, res_data)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'DELINTERFACE')
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_bad_protocol_version(self, *args):
        c = self.make_connection()
        c._requests = Mock()
        c.defunct = Mock()

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage, version=0x7f)
        options = self.make_options_body()
        message = self.make_msg(header, options)
        c._iobuf = BytesIO()
        c._iobuf.write(message)
        c.process_io_buffer()

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_negative_body_length(self, *args):
        c = self.make_connection()
        c._requests = Mock()
        c.defunct = Mock()

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)
        message = header + int32_pack(-13)
        c._iobuf = BytesIO()
        c._iobuf.write(message)
        c.process_io_buffer()

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_unsupported_cql_version(self, *args):
        c = self.make_connection()
        c._requests = {0: (c._handle_options_response, ProtocolHandler.decode_message, [])}
        c.defunct = Mock()
        c.cql_version = "3.0.3"

        # read in a SupportedMessage response
        header = self.make_header_prefix(SupportedMessage)

        options_buf = BytesIO()
        write_stringmultimap(options_buf, {
            'CQL_VERSION': ['7.8.9'],
            'COMPRESSION': []
        })
        options = options_buf.getvalue()

        c.process_msg(_Frame(version=4, flags=0, stream=0, opcode=SupportedMessage.opcode, body_offset=9, end_pos=9 + len(options)), options)

        # make sure it errored correctly
        c.defunct.assert_called_once_with(ANY)
        args, kwargs = c.defunct.call_args
        self.assertIsInstance(args[0], ProtocolError)
test_control_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_handle_topology_change(self):
        event = {
            'change_type': 'NEW_NODE',
            'address': ('1.2.3.4', 9000)
        }
        self.cluster.scheduler.reset_mock()
        self.control_connection._handle_topology_change(event)
        self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')

        event = {
            'change_type': 'REMOVED_NODE',
            'address': ('1.2.3.4', 9000)
        }
        self.cluster.scheduler.reset_mock()
        self.control_connection._handle_topology_change(event)
        self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.cluster.remove_host, None)

        event = {
            'change_type': 'MOVED_NODE',
            'address': ('1.2.3.4', 9000)
        }
        self.cluster.scheduler.reset_mock()
        self.control_connection._handle_topology_change(event)
        self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection._refresh_nodes_if_not_up, '1.2.3.4')
test_control_connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_handle_schema_change(self):

        change_types = [getattr(SchemaChangeType, attr) for attr in vars(SchemaChangeType) if attr[0] != '_']
        for change_type in change_types:
            event = {
                'target_type': SchemaTargetType.TABLE,
                'change_type': change_type,
                'keyspace': 'ks1',
                'table': 'table1'
            }
            self.cluster.scheduler.reset_mock()
            self.control_connection._handle_schema_change(event)
            self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)

            self.cluster.scheduler.reset_mock()
            event['target_type'] = SchemaTargetType.KEYSPACE
            del event['table']
            self.control_connection._handle_schema_change(event)
            self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
test_response_future.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_result_message(self):
        session = self.make_basic_session()
        session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1', 'ip2']
        pool = session._pools.get.return_value
        pool.is_shutdown = False

        connection = Mock(spec=Connection)
        pool.borrow_connection.return_value = (connection, 1)

        rf = self.make_response_future(session)
        rf.send_request()

        rf.session._pools.get.assert_called_once_with('ip1')
        pool.borrow_connection.assert_called_once_with(timeout=ANY)

        connection.send_msg.assert_called_once_with(rf.message, 1, cb=ANY, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=[])

        rf._set_result(None, None, None, self.make_mock_response([{'col': 'val'}]))
        result = rf.result()
        self.assertEqual(result, [{'col': 'val'}])
test_api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_update_host(self, mock_host, mock_get_device_type):
        mock_get_device_type.return_value = "network_devices"
        record = dict(fake_resources.HOST1.items())
        payload = {'name': 'Host_New', 'parent_id': 2}
        db_data = payload.copy()
        record.update(payload)
        mock_host.return_value = record

        resp = self.put('/v1/hosts/1', data=payload)

        self.assertEqual(resp.json['name'], db_data['name'])
        self.assertEqual(resp.json['parent_id'], db_data['parent_id'])
        self.assertEqual(200, resp.status_code)
        mock_host.assert_called_once_with(mock.ANY, '1', db_data)
        mock_get_device_type.assert_called_once()
        up_link = {
            "rel": "up",
            "href": "http://localhost/v1/network-devices/2"
        }
        self.assertIn(up_link, resp.json["links"])
test_api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_host_by_ip_address_filter(self, fake_hosts):
        region_id = 1
        ip_address = '10.10.0.1'
        filters = {
            'region_id': 1, 'ip_address': ip_address,
            'resolved-values': True,
        }
        path_query = '/v1/hosts?region_id={}&ip_address={}'.format(
            region_id, ip_address
        )
        fake_hosts.return_value = (fake_resources.HOSTS_LIST_R2, {})
        resp = self.get(path_query)
        host_resp = fake_resources.HOSTS_LIST_R2
        self.assertEqual(len(resp.json['hosts']), 1)
        self.assertEqual(resp.json['hosts'][0]["name"], host_resp[0].name)

        fake_hosts.assert_called_once_with(
            mock.ANY, filters, {'limit': 30, 'marker': None},
        )
test_api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_projects_put_variables(self, mock_project):
        proj1 = fake_resources.PROJECT1
        proj1_id = str(proj1.id)
        db_return_value = copy.deepcopy(proj1)
        db_return_value.variables["a"] = "b"
        mock_project.return_value = db_return_value
        payload = {"a": "b"}
        db_data = payload.copy()
        resp = self.put(
            'v1/projects/{}/variables'.format(proj1_id),
            data=payload
        )
        self.assertEqual(resp.status_code, 200)
        mock_project.assert_called_once_with(mock.ANY, "projects", proj1_id,
                                             db_data)
        expected = {
            "variables": {"key1": "value1", "key2": "value2", "a": "b"},
        }
        self.assertDictEqual(expected, resp.json)
test_api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_put_network_device(self, fake_device, mock_get_device_type):
        mock_get_device_type.return_value = "network_devices"
        payload = {"name": "NetDev_New1", "parent_id": 2}
        fake_device.return_value = dict(fake_resources.NETWORK_DEVICE1.items(),
                                        **payload)
        resp = self.put('v1/network-devices/1', data=payload)
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.json['name'], "NetDev_New1")
        self.assertEqual(resp.json['parent_id'], 2)
        fake_device.assert_called_once_with(
            mock.ANY, '1', {"name": "NetDev_New1", "parent_id": 2}
        )
        mock_get_device_type.assert_called_once()
        up_link = {
            "rel": "up",
            "href": "http://localhost/v1/network-devices/2"
        }
        self.assertIn(up_link, resp.json["links"])
test_api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_network_devices_by_ip_address_filter(self, fake_devices):
        region_id = '1'
        ip_address = '10.10.0.1'
        filters = {'region_id': region_id, 'ip_address': ip_address,
                   'resolved-values': True}
        path_query = '/v1/network-devices?region_id={}&ip_address={}'.format(
            region_id, ip_address
        )
        fake_devices.return_value = (fake_resources.NETWORK_DEVICE_LIST1, {})
        resp = self.get(path_query)
        device_resp = fake_resources.NETWORK_DEVICE_LIST1
        self.assertEqual(len(resp.json['network_devices']), 1)
        self.assertEqual(resp.json['network_devices'][0]["ip_address"],
                         device_resp[0].ip_address)

        fake_devices.assert_called_once_with(
            mock.ANY, filters, {'limit': 30, 'marker': None},
        )
test_api.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_netinterfaces_by_ip_address_filter(self, fake_interfaces):
        device_id = 1
        ip_address = '10.10.0.1'
        filters = {'device_id': device_id, 'ip_address': ip_address}
        path_query = (
            '/v1/network-interfaces?device_id={}&ip_address={}'.format(
                device_id, ip_address
            )
        )
        fake_interfaces.return_value = (fake_resources.NETWORK_INTERFACE_LIST1,
                                        {})
        resp = self.get(path_query)
        interface_resp = fake_resources.NETWORK_INTERFACE_LIST1
        self.assertEqual(len(resp.json['network_interfaces']), 1)
        self.assertEqual(resp.json['network_interfaces'][0]["name"],
                         interface_resp[0].name)

        fake_interfaces.assert_called_once_with(
            mock.ANY, filters, {'limit': 30, 'marker': None},
        )


问题


面经


文章

微信
公众号

扫码关注公众号