python类OK的实例源码

test_cli.py 文件源码 项目:skipper 作者: Stratoscale 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_run_with_existing_remote_build_container(self, skipper_runner_run_mock, requests_get_mock):
        requests_response_class_mock = mock.MagicMock(spec='requests.Response')
        requests_response_mock = requests_response_class_mock.return_value
        requests_response_mock.json.return_value = {
            'name': 'my_image',
            'tags': ['latest', 'aaaaaaa', 'bbbbbbb', 'build-container-tag']
        }
        requests_response_mock.status_code = http_client.OK
        requests_get_mock.return_value = requests_response_mock

        command = ['ls', '-l']
        run_params = command
        self._invoke_cli(
            global_params=self.global_params,
            subcmd='run',
            subcmd_params=run_params
        )
        expected_image_name = 'registry.io:5000/build-container-image:build-container-tag'
        skipper_runner_run_mock.assert_called_once_with(command, fqdn_image=expected_image_name, environment=[],
                                                        interactive=False, name=None, net='host', volumes=None,
                                                        workdir=None, use_cache=False)
tools.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        query = self.path.split('?', 1)[-1]
        query = dict(urllib.parse.parse_qsl(query))
        self.server.query_params = query
        self.wfile.write(
            b"<html><head><title>Authentication Status</title></head>")
        self.wfile.write(
            b"<body><p>The authentication flow has completed.</p>")
        self.wfile.write(b"</body></html>")
client.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _detect_gce_environment():
    """Determine if the current environment is Compute Engine.

    Returns:
        Boolean indicating whether or not the current environment is Google
        Compute Engine.
    """
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT)
    try:
        response, _ = transport.request(
            http, _GCE_METADATA_URI, headers=_GCE_HEADERS)
        return (
            response.status == http_client.OK and
            response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR)
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        logger.info('Timeout attempting to reach GCE metadata service.')
        return False
tools.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        parts = urllib.parse.urlparse(self.path)
        query = _helpers.parse_unique_urlencoded(parts.query)
        self.server.query_params = query
        self.wfile.write(
            b'<html><head><title>Authentication Status</title></head>')
        self.wfile.write(
            b'<body><p>The authentication flow has completed.</p>')
        self.wfile.write(b'</body></html>')
base.py 文件源码 项目:ironic-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _list_request(self, resource, permanent=False, headers=None,
                      extra_headers=False, **kwargs):
        """Get the list of objects of the specified type.

        :param resource: The name of the REST resource, e.g., 'nodes'.
        :param headers: List of headers to use in request.
        :param extra_headers: Specify whether to use headers.
        :param **kwargs: Parameters for the request.
        :returns: A tuple with the server response and deserialized JSON list
                 of objects

        """
        uri = self._get_uri(resource, permanent=permanent)
        if kwargs:
            uri += "?%s" % urllib.urlencode(kwargs)

        resp, body = self.get(uri, headers=headers,
                              extra_headers=extra_headers)
        self.expected_success(http_client.OK, resp.status)

        return resp, self.deserialize(body)
base.py 文件源码 项目:ironic-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _show_request(self,
                      resource,
                      uuid=None,
                      permanent=False,
                      **kwargs):
        """Gets a specific object of the specified type.

        :param uuid: Unique identifier of the object in UUID format.
        :returns: Serialized object as a dictionary.

        """
        if 'uri' in kwargs:
            uri = kwargs['uri']
        else:
            uri = self._get_uri(resource, uuid=uuid, permanent=permanent)
        resp, body = self.get(uri)
        self.expected_success(http_client.OK, resp.status)

        return resp, self.deserialize(body)
run_gce_system_tests.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_token_info(self):
        credentials = gce.AppAssertionCredentials([])
        http = transport.get_http_object()

        # First refresh to get the access token.
        self.assertIsNone(credentials.access_token)
        credentials.refresh(http)
        self.assertIsNotNone(credentials.access_token)

        # Then check the access token against the token info API.
        query_params = {'access_token': credentials.access_token}
        token_uri = (oauth2client.GOOGLE_TOKEN_INFO_URI + '?' +
                     urllib.parse.urlencode(query_params))
        response, content = transport.request(http, token_uri)
        self.assertEqual(response.status, http_client.OK)

        content = content.decode('utf-8')
        payload = json.loads(content)
        self.assertEqual(payload['access_type'], 'offline')
        self.assertLessEqual(int(payload['expires_in']), 3600)
test_decorators.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_has_credentials_in_storage(self, OAuth2Credentials):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes.return_value = True
        credentials_mock.invalid = False
        credentials_mock.scopes = set([])
        OAuth2Credentials.from_json.return_value = credentials_mock

        @decorators.oauth_enabled
        def test_view(request):
            return http.HttpResponse('test')

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertEqual(response.content, b'test')
        self.assertTrue(request.oauth.has_credentials())
        self.assertIsNotNone(request.oauth.http)
        self.assertSetEqual(
            request.oauth.scopes,
            set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
test_decorators.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_specified_scopes(self, dictionary_storage_mock):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes = mock.Mock(return_value=True)
        credentials_mock.is_valid = True
        dictionary_storage_mock.get.return_value = credentials_mock

        @decorators.oauth_enabled(scopes=['additional-scope'])
        def test_view(request):
            return http.HttpResponse('hello world')  # pragma: NO COVER

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertIsNotNone(request.oauth)
        self.assertFalse(request.oauth.has_credentials())
test_metadata.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_get_token_success(self, now):
        http = request_mock(
            http_client.OK,
            'application/json',
            json.dumps({'access_token': 'a', 'expires_in': 100})
        )
        token, expiry = _metadata.get_token(http=http)
        self.assertEqual(token, 'a')
        self.assertEqual(
            expiry, datetime.datetime.min + datetime.timedelta(seconds=100))
        # Verify mocks.
        now.assert_called_once_with()
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, EXPECTED_URL + '/token')
        self.assertEqual(http.method, 'GET')
        self.assertIsNone(http.body)
        self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
test_client.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_exchange_using_authorization_header(self):
        auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
        flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            authorization_header=auth_header,
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            revoke_uri='dummy_revoke_uri',
        )
        http = http_mock.HttpMockSequence([
            ({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
        ])

        credentials = flow.step2_exchange(code='some random code', http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)

        self.assertEqual(len(http.requests), 1)
        test_request = http.requests[0]
        # Did we pass the Authorization header?
        self.assertEqual(test_request['headers']['Authorization'], auth_header)
        # Did we omit client_secret from POST body?
        self.assertTrue('client_secret' not in test_request['body'])
http_mock.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, headers=None, data=None):
        """HttpMock constructor.

        Args:
            headers: dict, header to return with response
        """
        if headers is None:
            headers = {'status': http_client.OK}
        self.data = data
        self.response_headers = headers
        self.headers = None
        self.uri = None
        self.method = None
        self.body = None
        self.headers = None
        self.requests = 0
client.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _detect_gce_environment():
    """Determine if the current environment is Compute Engine.

    Returns:
        Boolean indicating whether or not the current environment is Google
        Compute Engine.
    """
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT)
    try:
        response, _ = transport.request(
            http, _GCE_METADATA_URI, headers=_GCE_HEADERS)
        return (
            response.status == http_client.OK and
            response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR)
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        logger.info('Timeout attempting to reach GCE metadata service.')
        return False
tools.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        parts = urllib.parse.urlparse(self.path)
        query = _helpers.parse_unique_urlencoded(parts.query)
        self.server.query_params = query
        self.wfile.write(
            b'<html><head><title>Authentication Status</title></head>')
        self.wfile.write(
            b'<body><p>The authentication flow has completed.</p>')
        self.wfile.write(b'</body></html>')
test_decorators.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_has_credentials_in_storage(self, OAuth2Credentials):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes.return_value = True
        credentials_mock.invalid = False
        credentials_mock.scopes = set([])
        OAuth2Credentials.from_json.return_value = credentials_mock

        @decorators.oauth_enabled
        def test_view(request):
            return http.HttpResponse('test')

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertEqual(response.content, b'test')
        self.assertTrue(request.oauth.has_credentials())
        self.assertIsNotNone(request.oauth.http)
        self.assertSetEqual(
            request.oauth.scopes,
            set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
test_decorators.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_specified_scopes(self, dictionary_storage_mock):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes = mock.Mock(return_value=True)
        credentials_mock.is_valid = True
        dictionary_storage_mock.get.return_value = credentials_mock

        @decorators.oauth_enabled(scopes=['additional-scope'])
        def test_view(request):
            return http.HttpResponse('hello world')  # pragma: NO COVER

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertIsNotNone(request.oauth)
        self.assertFalse(request.oauth.has_credentials())
test_decorators.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_has_credentials_in_storage(self, UserOAuth2):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        @decorators.oauth_required
        def test_view(request):
            return http.HttpResponse("test")

        my_user_oauth = mock.Mock()

        UserOAuth2.return_value = my_user_oauth
        my_user_oauth.has_credentials.return_value = True

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertEqual(response.content, b"test")
test_metadata.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_get_token_success(self, now):
        http = request_mock(
            http_client.OK,
            'application/json',
            json.dumps({'access_token': 'a', 'expires_in': 100})
        )
        token, expiry = _metadata.get_token(http=http)
        self.assertEqual(token, 'a')
        self.assertEqual(
            expiry, datetime.datetime.min + datetime.timedelta(seconds=100))
        # Verify mocks.
        now.assert_called_once_with()
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, EXPECTED_URL + '/token')
        self.assertEqual(http.method, 'GET')
        self.assertIsNone(http.body)
        self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_exchange_with_pkce(self):
        http = http_mock.HttpMockSequence([
            ({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
        ])
        flow = client.OAuth2WebServerFlow(
            'client_id+1',
            scope='foo',
            redirect_uri='http://example.com',
            pkce=True,
            code_verifier=self.good_verifier)
        flow.step2_exchange(code='some random code', http=http)

        self.assertEqual(len(http.requests), 1)
        test_request = http.requests[0]
        self.assertIn(
            'code_verifier={0}'.format(self.good_verifier.decode()),
            test_request['body'])
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_exchange_using_authorization_header(self):
        auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
        flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            authorization_header=auth_header,
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            revoke_uri='dummy_revoke_uri',
        )
        http = http_mock.HttpMockSequence([
            ({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
        ])

        credentials = flow.step2_exchange(code='some random code', http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)

        self.assertEqual(len(http.requests), 1)
        test_request = http.requests[0]
        # Did we pass the Authorization header?
        self.assertEqual(test_request['headers']['Authorization'], auth_header)
        # Did we omit client_secret from POST body?
        self.assertTrue('client_secret' not in test_request['body'])
http_mock.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, headers=None, data=None):
        """HttpMock constructor.

        Args:
            headers: dict, header to return with response
        """
        if headers is None:
            headers = {'status': http_client.OK}
        self.data = data
        self.response_headers = headers
        self.headers = None
        self.uri = None
        self.method = None
        self.body = None
        self.headers = None
        self.requests = 0
test_redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_allocate_node_conflict(self, mock_request, mock_get_url):
        """Test allocate resource conflict when compose node"""
        mock_get_url.return_value = '/redfish/v1/Nodes'

        # Fake response for getting nodes root
        fake_node_root_resp = fakes.mock_request_get(fakes.fake_nodes_root(),
                                                     http_client.OK)
        # Fake response for allocating node
        fake_node_allocation_conflict = \
            fakes.mock_request_get(fakes.fake_allocate_node_conflict(),
                                   http_client.CONFLICT)
        mock_request.side_effect = [fake_node_root_resp,
                                    fake_node_allocation_conflict]

        with self.assertRaises(exception.RedfishException) as context:
            redfish.compose_node({"name": "test_node"})

        self.assertTrue("There are no computer systems available for this "
                        "allocation request." in str(context.exception.detail))
test_redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_set_boot_source_success(self, mock_get_url, mock_request):
        """Test successfully reset node status"""
        mock_get_url.return_value = '/redfish/v1/Nodes'
        fake_node_detail = fakes.mock_request_get(
            fakes.fake_node_detail(), http_client.OK)
        fake_node_action_resp = fakes.mock_request_get(
            {}, http_client.NO_CONTENT)
        mock_request.side_effect = [fake_node_detail, fake_node_action_resp]

        result = redfish.set_boot_source(
            "1", {"Boot": {"Enabled": "Once", "Target": "Hdd"}})
        expected = exception.confirmation(
            confirm_code="Set Boot Source of Composed Node",
            confirm_detail="The boot source of composed node has been set to "
                           "'{0}' with enabled state '{1}' successfully."
                           .format("Hdd", "Once"))

        self.assertEqual(expected, result)
test_validation.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_podmanager_create(self, pstatus_mock, plist_mock, pcreate_mock):
        pstatus_mock.return_value = constants.PODM_STATUS_ONLINE
        plist_mock.return_value = []
        values = {
            "name": "podm_name",
            "url": "https://10.240.212.123",
            "authentication": [
                {
                    "type": "basic",
                    "auth_items":
                    {
                        "username": "xxxxxxx",
                        "password": "xxxxxxx"
                    }
                }]
            }
        result = copy.deepcopy(values)
        result['status'] = constants.PODM_STATUS_ONLINE
        pcreate_mock.return_value = result
        response = self.app.post('/v1/pod_managers',
                                 content_type='application/json',
                                 data=json.dumps(values))
        self.assertEqual(http_client.OK, response.status_code)
test_validation.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_compose_request_using_properties(self, mock_compose, mock_conn):
        req = {
            "name": "test_request",
            "podm_id": "test-podm",
            "properties": {
                "memory": {
                    "capacity_mib": "4000",
                    "type": "DDR3"
                },
                "processor": {
                    "model": "Intel",
                    "total_cores": "4"
                }
            }
        }
        mock_compose.return_value = req
        resp = self.app.post('/v1/nodes',
                             content_type='application/json',
                             data=json.dumps(req))
        mock_conn.assert_called_once_with('test-podm')
        self.assertEqual(http_client.OK, resp.status_code)
redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def show_cpu_details(cpu_url):
    """Get processor details .

    :param cpu_url: relative redfish url to processor,
                    e.g /redfish/v1/Systems/1/Processors/1.
    :returns: dict of processor detail.
    """
    resp = send_request(cpu_url)
    if resp.status_code != http_client.OK:
        # Raise exception if don't find processor
        raise exception.RedfishException(resp.json(),
                                         status_code=resp.status_code)
    respdata = resp.json()
    cpu_details = {
        "instruction_set": respdata.get("InstructionSet"),
        "model": respdata.get("Model"),
        "speed_mhz": respdata.get("MaxSpeedMHz"),
        "total_core": respdata.get("TotalCores")
    }

    return cpu_details
redfish.py 文件源码 项目:valence 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def show_ram_details(ram_url):
    """Get memory details .

    :param ram_url: relative redfish url to memory,
                    e.g /redfish/v1/Systems/1/Memory/1.
    :returns: dict of memory detail.
    """
    resp = send_request(ram_url)
    if resp.status_code != http_client.OK:
        # Raise exception if don't find memory
        raise exception.RedfishException(resp.json(),
                                         status_code=resp.status_code)
    respdata = resp.json()
    ram_details = {
        "data_width_bit": respdata.get("DataWidthBits"),
        "speed_mhz": respdata.get("OperatingSpeedMhz"),
        "total_memory_mb": respdata.get("CapacityMiB")
    }

    return ram_details
test_http.py 文件源码 项目:python-valenceclient 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_http_request_client_success(self):
        kwargs = {"valence_url": "http://localhost/"}
        client = http.HTTPClient(**kwargs)
        resp = utils.mockSessionResponse(
            {'content-type': 'test/plain'},
            'Body',
            version=1,
            status_code=http_client.OK)

        with mock.patch.object(client, 'session',
                               autospec=True) as mock_session:
            mock_session.request.side_effect = iter([resp])
            response, body_iter = client.json_request('GET',
                                                      '/redfish/v1/Nodes')

        self.assertEqual(http_client.OK, response.status_code)
test_run.py 文件源码 项目:column 作者: vmware 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_get_run_by_id(self):
        response = self.app.get('/runs/1234')
        self.assertEqual(http_client.NOT_FOUND, response.status_code)

        pb = 'tests/fixtures/playbooks/hello_world.yml'
        response = self.app.post(
            '/runs',
            data=json.dumps(dict(playbook_path=pb,
                                 inventory_file='localhosti,',
                                 options={'connection': 'local'})),
            content_type='application/json')
        res_dict = json.loads(response.data)
        run_id = res_dict['id']
        self.assertEqual(http_client.OK, response.status_code)
        response = self.app.get('/runs/{}'.format(run_id))
        self.assertEqual(http_client.OK, response.status_code)
        self._wait_for_run_complete(run_id)
test_run.py 文件源码 项目:column 作者: vmware 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_run_list(self):
        pb = 'tests/fixtures/playbooks/hello_world.yml'
        response = self.app.post(
            '/runs',
            data=json.dumps(dict(playbook_path=pb,
                                 inventory_file='localhost,',
                                 options={'connection': 'local'})),
            content_type='application/json')
        res_dict = json.loads(response.data)
        run_id = res_dict['id']
        self.assertEqual(http_client.OK, response.status_code)
        response = self.app.get('/runs')
        res_list = json.loads(response.data)
        found = False
        for item in res_list:
            if item['id'] == run_id:
                found = True
                break
        self.assertEqual(True, found)
        self._wait_for_run_complete(run_id)


问题


面经


文章

微信
公众号

扫码关注公众号