python类parse_qs()的实例源码

oauth2.py 文件源码 项目:flask-api-skeleton 作者: ianunruh 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()

        parts = urlparse(self.path)
        query = parse_qs(parts.query)

        result = {}
        for k, v in query.items():
            if len(v) == 1:
                result[k] = v[0]
            elif len(v) == 0:
                result[k] = None
            else:
                result[k] = v

        self.server.callback_result = result

        self.wfile.write(b'You can close this window now')
base.py 文件源码 项目:panko 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _parse_connection_url(url):
        """Parse connection parameters from a database url.

        .. note::

          HBase Thrift does not support authentication and there is no
          database name, so we are not looking for these in the url.
        """
        opts = {}
        result = netutils.urlsplit(url)
        opts['table_prefix'] = urlparse.parse_qs(
            result.query).get('table_prefix', [None])[0]
        opts['table_prefix_separator'] = urlparse.parse_qs(
            result.query).get('table_prefix_separator', ['_'])[0]
        opts['dbtype'] = result.scheme
        if ':' in result.netloc:
            opts['host'], port = result.netloc.split(':')
        else:
            opts['host'] = result.netloc
            port = 9090
        opts['port'] = port and int(port) or 9090
        return opts
base.py 文件源码 项目:novajoin 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_body(self, request):

        params = urlparse.parse_qs(request.query_string)
        if len(request.body) == 0:
            LOG.debug("Empty body provided in request")
            return None, params

        try:
            content_type = request.get_content_type()
        except exception.InvalidContentType:
            LOG.debug("Unrecognized Content-Type provided in request")
            return None, ''

        if not content_type:
            LOG.debug("No Content-Type provided in request")
            return None, ''

        return content_type, request.body
validators.py 文件源码 项目:callisto-core 作者: project-callisto 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def facebook_validation_function(url):
    try:
        url_parts = _get_url_parts(url)
        # check if acceptable domain
        domain = url_parts[1]
        if not (domain == 'facebook.com' or domain.endswith('.facebook.com')):
            return None
        path = _get_initial_path(url_parts)

        # old style numeric profiles
        if path == "profile.php":  # ex. https://www.facebook.com/profile.php?id=100010279981469
            path = parse_qs(url_parts[3]).get('id')[0]
        if path == 'people':  # ex. https://www.facebook.com/people/John-Doe/100013326345115
            path = url_parts[2].strip('/').split('/')[2].lower()

        # TODO: validate against allowed username characteristics
        # https://github.com/project-callisto/callisto-core/issues/181
        if not path or path == "" or path.endswith(
                '.php') or path in generic_fb_urls:
            return None
        else:
            return path
    except ValidationError:
        return None
test_flask_util.py 文件源码 项目:deb-python-oauth2client 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
test_flask_util.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
test_flask_util.py 文件源码 项目:eclipse2017 作者: google 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
request.py 文件源码 项目:aspen.py 作者: AspenWeb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, raw, errors='replace'):
        """Takes a string of type application/x-www-form-urlencoded.
        """
        # urllib needs bytestrings in py2 and unicode strings in py3
        raw_str = raw.encode('ascii') if PY2 else raw

        self.decoded = _decode(unquote_plus(raw_str), errors=errors)
        self.raw = raw

        common_kw = dict(keep_blank_values=True, strict_parsing=False)
        if PY2:
            # in python 2 parse_qs does its own unquote_plus'ing ...
            as_dict = parse_qs(raw_str, **common_kw)
            # ... but doesn't decode to unicode.
            for k, vals in list(as_dict.items()):
                as_dict[_decode(k, errors=errors)] = [
                    _decode(v, errors=errors) for v in vals
                ]
        else:
            # in python 3 parse_qs does the decoding
            as_dict = parse_qs(raw_str, errors=errors, **common_kw)

        Mapping.__init__(self, as_dict)
utils.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def update_query_parameters(url, query_parameters):
    """
    Return url with updated query parameters.

    Arguments:
        url (str): Original url whose query parameters need to be updated.
        query_parameters (dict): A dictionary containing query parameters to be added to course selection url.

    Returns:
        (slug): slug identifier for the identity provider that can be used for identity verification of
            users associated the enterprise customer of the given user.

    """
    scheme, netloc, path, query_string, fragment = urlsplit(url)
    url_params = parse_qs(query_string)

    # Update url query parameters
    url_params.update(query_parameters)

    return urlunsplit(
        (scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
    )
utils.py 文件源码 项目:edx-enterprise 作者: edx 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def traverse_pagination(response, endpoint):
    """
    Traverse a paginated API response.

    Extracts and concatenates "results" (list of dict) returned by DRF-powered
    APIs.

    Arguments:
        response (Dict): Current response dict from service API
        endpoint (slumber Resource object): slumber Resource object from edx-rest-api-client

    Returns:
        list of dict.

    """
    results = response.get('results', [])

    next_page = response.get('next')
    while next_page:
        querystring = parse_qs(urlparse(next_page).query, keep_blank_values=True)
        response = endpoint.get(**querystring)
        results += response.get('results', [])
        next_page = response.get('next')

    return results
test_65_authn_query.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_msg(hinfo, binding):
    if binding == BINDING_SOAP:
        xmlstr = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        xmlstr = _inp[i:j]
    else:  # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        xmlstr = parse_qs(parts.query)["SAMLRequest"][0]

    return xmlstr

# ------------------------------------------------------------------------
test_64_artifact.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_HTTP_ARTIFACT:
        # either by POST or by redirect
        if hinfo["data"]:
            _inp = hinfo["data"][3]
            i = _inp.find(TAG1)
            i += len(TAG1) + 1
            j = _inp.find('"', i)
            msg = _inp[i:j]
        else:
            parts = urlparse(hinfo["url"])
            msg = parse_qs(parts.query)["SAMLart"][0]
    else: # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg
test_68_assertion_id.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_URI:
        if response:
            msg = hinfo["data"]
        else:
            msg = ""
            return parse_qs(hinfo["url"].split("?")[1])["ID"][0]
    else:  # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg
test_50_server.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_parse_faulty_request_to_err_status(self):
        req_id, authn_request = self.client.create_authn_request(
            destination="http://www.example.com")

        binding = BINDING_HTTP_REDIRECT
        htargs = self.client.apply_binding(binding, "%s" % authn_request,
                                           "http://www.example.com", "abcd")
        _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
        print(_dict)

        try:
            self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
            status = None
        except OtherError as oe:
            print(oe.args)
            status = s_utils.error_status_factory(oe)

        assert status
        print(status)
        assert _eq(status.keyswv(), ["status_code", "status_message"])
        assert status.status_message.text == 'Not destined for me!'
        status_code = status.status_code
        assert _eq(status_code.keyswv(), ["status_code", "value"])
        assert status_code.value == samlp.STATUS_RESPONDER
        assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
test_50_server.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_parse_ok_request(self):
        req_id, authn_request = self.client.create_authn_request(
            message_id="id1", destination="http://localhost:8088/sso")

        print(authn_request)
        binding = BINDING_HTTP_REDIRECT
        htargs = self.client.apply_binding(binding, "%s" % authn_request,
                                           "http://www.example.com", "abcd")
        _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
        print(_dict)

        req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
        # returns a dictionary
        print(req)
        resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])
        assert resp_args["destination"] == "http://lingon.catalogix.se:8087/"
        assert resp_args["in_response_to"] == "id1"
        name_id_policy = resp_args["name_id_policy"]
        assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
        assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
        assert resp_args[
                   "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
client_base.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parse_discovery_service_response(url="", query="",
            returnIDParam="entityID"):
        """
        Deal with the response url from a Discovery Service

        :param url: the url the user was redirected back to or
        :param query: just the query part of the URL.
        :param returnIDParam: This is where the identifier of the IdP is
            place if it was specified in the query. Default is 'entityID'
        :return: The IdP identifier or "" if none was given
        """

        if url:
            part = urlparse(url)
            qsd = parse_qs(part[4])
        elif query:
            qsd = parse_qs(query)
        else:
            qsd = {}

        try:
            return qsd[returnIDParam][0]
        except KeyError:
            return ""
sp.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def disco(environ, start_response, _sp):
    query = parse_qs(environ["QUERY_STRING"])
    entity_id = query["entityID"][0]
    _sid = query["sid"][0]
    came_from = CACHE.outstanding_queries[_sid]
    _sso = SSO(_sp, environ, start_response, cache=CACHE, **ARGS)
    resp = _sso.redirect_to_auth(_sso.sp, entity_id, came_from)

    # Add cookie
    kaka = make_cookie("ve_disco", entity_id, "SEED_SAW")
    resp.headers.append(kaka)
    return resp(environ, start_response)


# ----------------------------------------------------------------------------


# noinspection PyUnusedLocal
mock.py 文件源码 项目:cloak-server 作者: encryptme 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _post_servers(self, request):
        # type: (requests.PreparedRequest) -> requests.Response
        data = parse_qs(force_text(request.body))

        self.server_id = self._public_id('srv')
        self.auth_token = ''.join(random.choice(mixed_alphabet) for i in xrange(20))
        self.api_version = force_text(request.headers['X-Cloak-API-Version'])
        self.name = data['name'][0]
        self.target_id = data['target'][0]

        # Make sure these exist
        data['email'][0]
        data['password'][0]

        result = {
            'server_id': self.server_id,
            'auth_token': self.auth_token,
            'server': self._server_result(),
        }

        return self._response(request, 201, result)
mock.py 文件源码 项目:cloak-server 作者: encryptme 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _post_server_csr(self, request):
        # type: (requests.PreparedRequest) -> requests.Response
        data = parse_qs(force_text(request.body))

        if self._authenticate(request):
            self.csr = data['csr'][0]
            self.pki_tag = ''.join(random.choice(mixed_alphabet) for i in xrange(16))
            response = self._response(request, 202)
        else:
            response = self._response(request, 401)

        return response

    #
    # Utils
    #
cloudformation_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def forward_request(self, method, path, data, headers):
        req_data = None
        if method == 'POST' and path == '/':
            req_data = urlparse.parse_qs(to_str(data))
            action = req_data.get('Action')[0]

        if req_data:
            if action == 'CreateChangeSet':
                return create_change_set(req_data)
            elif action == 'DescribeChangeSet':
                return describe_change_set(req_data)
            elif action == 'ExecuteChangeSet':
                return execute_change_set(req_data)
            elif action == 'UpdateStack' and req_data.get('TemplateURL'):
                # Temporary fix until the moto CF backend can handle TemplateURL (currently fails)
                url = re.sub(r'https?://s3\.amazonaws\.com', aws_stack.get_local_service_url('s3'),
                    req_data.get('TemplateURL')[0])
                req_data['TemplateBody'] = requests.get(url).content
                modified_data = urlparse.urlencode(req_data, doseq=True)
                return Request(data=modified_data, headers=headers, method=method)
            elif action == 'ValidateTemplate':
                return validate_template(req_data)

        return True
mediawiki.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def oauth_authorization_request(self, token):
        """
        Generates the URL for the authorization link
        """
        if not isinstance(token, dict):
            token = parse_qs(token)

        oauth_token = token.get(self.OAUTH_TOKEN_PARAMETER_NAME)[0]
        state = self.get_or_create_state()
        base_url = self.setting('MEDIAWIKI_URL')

        return '{0}?{1}'.format(base_url, urlencode({
            'title': 'Special:Oauth/authenticate',
            self.OAUTH_TOKEN_PARAMETER_NAME: oauth_token,
            self.REDIRECT_URI_PARAMETER_NAME: self.get_redirect_uri(state)
        }))
mediawiki.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def access_token(self, token):
        """
        Fetches the Mediawiki access token.
        """
        auth_token = self.oauth_auth(token)

        response = requests.post(
            url=self.setting('MEDIAWIKI_URL'),
            params={'title': 'Special:Oauth/token'},
            auth=auth_token
        )
        credentials = parse_qs(response.content)
        oauth_token_key = credentials.get(b('oauth_token'))[0]
        oauth_token_secret = credentials.get(b('oauth_token_secret'))[0]
        oauth_token_key = oauth_token_key.decode()
        oauth_token_secret = oauth_token_secret.decode()

        return {
            'oauth_token': oauth_token_key,
            'oauth_token_secret': oauth_token_secret
        }
spider.py 文件源码 项目:collectors 作者: opentrials 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _process_url(page_from, page_to, url):

    # Get url page
    query = urlparse(url).query
    query = parse_qs(query)
    page = query.get('page')

    # Preserve if match
    if page:
        page_from = int(page_from)
        page_to = int(page_to)
        page = int(page[0])
        if page >= page_from and page <= page_to:
            return url

    return None
test_serversV21.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_get_servers_with_limit(self):
        req = self.req('/fake/servers?limit=3')
        res_dict = self.controller.index(req)

        servers = res_dict['servers']
        self.assertEqual([s['id'] for s in servers],
                [fakes.get_fake_uuid(i) for i in range(len(servers))])

        servers_links = res_dict['servers_links']
        self.assertEqual(servers_links[0]['rel'], 'next')
        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected_params = {'limit': ['3'],
                           'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected_params))
test_serversV21.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_get_server_details_with_limit(self):
        req = self.req('/fake/servers/detail?limit=3')
        res = self.controller.detail(req)

        servers = res['servers']
        self.assertEqual([s['id'] for s in servers],
                [fakes.get_fake_uuid(i) for i in range(len(servers))])

        servers_links = res['servers_links']
        self.assertEqual(servers_links[0]['rel'], 'next')

        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers/detail', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected))
test_serversV21.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_server_details_with_limit_and_other_params(self):
        req = self.req('/fake/servers/detail'
                                      '?limit=3&blah=2:t'
                                      '&sort_key=id1&sort_dir=asc')
        res = self.controller.detail(req)

        servers = res['servers']
        self.assertEqual([s['id'] for s in servers],
                [fakes.get_fake_uuid(i) for i in range(len(servers))])

        servers_links = res['servers_links']
        self.assertEqual(servers_links[0]['rel'], 'next')

        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers/detail', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected = {'limit': ['3'], 'blah': ['2:t'],
                    'sort_key': ['id1'], 'sort_dir': ['asc'],
                    'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected))
test_servers.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_servers_with_limit(self):
        req = fakes.HTTPRequest.blank('/fake/servers?limit=3')
        res_dict = self.controller.index(req)

        servers = res_dict['servers']
        self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
                         [s['id'] for s in servers])

        servers_links = res_dict['servers_links']
        self.assertEqual('next', servers_links[0]['rel'])
        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected_params = {'limit': ['3'],
                           'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected_params))


问题


面经


文章

微信
公众号

扫码关注公众号