python类parse_qsl()的实例源码

utils.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
tpda.py 文件源码 项目:dnsknife 作者: Gandi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def params(arg):
    """
    Parse `arg` and returns a dict describing the TPDA
    arguments.

    Resolution order:
        - `arg` is a string is parsed as an URL
        - `arg` has an items() method and is parsed
           (works for multidicts, ..)
        - `arg` is a iterable is parsed as a
           ((name, value), (name, value), ..)
    """
    if isinstance(arg, str):
        pr = parse.urlparse(arg)
        if not pr.query:
            raise exceptions.IncompleteURI
        return Params(parse.parse_qsl(pr.query)).params()
    elif hasattr(arg, 'items'):
        return Params(arg.items()).params()
    elif hasattr(arg, '__iter__'):
        return Params(arg).params()
test_flask_pyoidc.py 文件源码 项目:Flask-pyoidc 作者: zamzterz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_logout(self):
        end_session_endpoint = 'https://provider.example.com/end_session'
        post_logout_uri = 'https://client.example.com/post_logout'
        authn = OIDCAuthentication(self.app,
                                   provider_configuration_info={'issuer': ISSUER,
                                                                'end_session_endpoint': end_session_endpoint},
                                   client_registration_info={'client_id': 'foo',
                                                             'post_logout_redirect_uris': [post_logout_uri]})
        id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'})
        with self.app.test_request_context('/logout'):
            flask.session['access_token'] = 'abcde'
            flask.session['userinfo'] = {'foo': 'bar', 'abc': 'xyz'}
            flask.session['id_token'] = id_token.to_dict()
            flask.session['id_token_jwt'] = id_token.to_jwt()

            end_session_redirect = authn._logout()
            assert all(k not in flask.session for k in ['access_token', 'userinfo', 'id_token', 'id_token_jwt'])

            assert end_session_redirect.status_code == 303
            assert end_session_redirect.headers['Location'].startswith(end_session_endpoint)
            parsed_request = dict(parse_qsl(urlparse(end_session_redirect.headers['Location']).query))
            assert parsed_request['state'] == flask.session['end_session_state']
            assert parsed_request['id_token_hint'] == id_token.to_jwt()
            assert parsed_request['post_logout_redirect_uri'] == post_logout_uri
netutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def params(self, collapse=True):
        """Extracts the query parameters from the split urls components.

        This method will provide back as a dictionary the query parameter
        names and values that were provided in the url.

        :param collapse: Boolean, turn on or off collapsing of query values
        with the same name. Since a url can contain the same query parameter
        name with different values it may or may not be useful for users to
        care that this has happened. This parameter when True uses the
        last value that was given for a given name, while if False it will
        retain all values provided by associating the query parameter name with
        a list of values instead of a single (non-list) value.
        """
        if self.query:
            if collapse:
                return dict(parse.parse_qsl(self.query))
            else:
                params = {}
                for (key, value) in parse.parse_qsl(self.query):
                    if key in params:
                        if isinstance(params[key], list):
                            params[key].append(value)
                        else:
                            params[key] = [params[key], value]
                    else:
                        params[key] = value
                return params
        else:
            return {}
auth.py 文件源码 项目:fanfou-py 作者: akgnah 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def request_token(self):
        resp = self.oauth_request(self.request_token_url, 'GET')
        oauth_token = dict(parse.parse_qsl(resp.read().decode()))
        self.authorize_url = self.authorize_url % (oauth_token['oauth_token'], self.callback)
        self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
        return self.oauth_token
auth.py 文件源码 项目:fanfou-py 作者: akgnah 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def access_token(self, oauth_token=None, oauth_verifier=None):
        self.oauth_token = oauth_token or self.oauth_token
        args = {'oauth_verifier': oauth_verifier} if oauth_verifier else {}  # if callback is oob
        resp = self.oauth_request(self.access_token_url, 'GET', args)
        oauth_token = dict(parse.parse_qsl(resp.read().decode()))
        self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
        return self.oauth_token
auth.py 文件源码 项目:fanfou-py 作者: akgnah 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def xauth(self, username, password):
        args = {
            'x_auth_username': username,
            'x_auth_password': password,
            'x_auth_mode': 'client_auth'
        }
        resp = self.oauth_request(self.access_token_url, 'GET', args)
        oauth_token = dict(parse.parse_qsl(resp.read().decode()))
        return {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
test_services.py 文件源码 项目:mixmatch 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, url):
        parts = parse.urlparse(url)
        _query = frozenset(parse.parse_qsl(parts.query))
        _path = parse.unquote_plus(parts.path)
        parts = parts._replace(query=_query, path=_path)
        self.parts = parts
base.py 文件源码 项目:mixmatch 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, url):
        parts = parse.urlparse(url)
        _query = frozenset(parse.parse_qsl(parts.query))
        _path = parse.unquote_plus(parts.path)
        parts = parts._replace(query=_query, path=_path)
        self.parts = parts
fake_clients.py 文件源码 项目:python-distilclient 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _cs_request_with_retries(self, url, method, **kwargs):
        # Check that certain things are called correctly
        if method in ['GET', 'DELETE']:
            assert 'body' not in kwargs
        elif method == 'PUT':
            assert 'body' in kwargs

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        # Note the call
        self.callstack.append((method, url, kwargs.get('body', None)))
        status, headers, body = getattr(self, callback)(**kwargs)
        r = utils.TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
        return r, body

    #
    # Quotas
    #
tpda.py 文件源码 项目:dnsknife 作者: Gandi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _qsl_get_one(qstring, param):
    """Return one unique param from query string"""
    args = parse.parse_qsl(qstring)
    sigvals = [arg[1] for arg in args if arg[0] == param]

    if len(sigvals) != 1:
        raise exceptions.InvalidArgument(param)

    return sigvals[0]



# Uses RDAP
resolver.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_s3_uri(self, uri):
        parsed = urlparse(uri)
        client = self.session_factory().client('s3')
        params = dict(
            Bucket=parsed.netloc,
            Key=parsed.path[1:])
        if parsed.query:
            params.update(dict(parse_qsl(parsed.query)))
        result = client.get_object(**params)
        return result['Body'].read()
main.py 文件源码 项目:github2gitlab 作者: chauffer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, url, query, cache):
        payloads_file = (self.tmpdir + "/" +
                         hashlib.sha1(url.encode('utf-8')).hexdigest() +
                         ".json")
        if (not cache or not os.access(payloads_file, 0) or
                time.time() - os.stat(payloads_file).st_mtime > 24 * 60 * 60):
            payloads = []
            next_query = query
            while next_query:
                log.debug(str(next_query))
                result = requests.get(url, params=next_query)
                payloads += result.json()
                next_query = None
                for link in result.headers.get('Link', '').split(','):
                    if 'rel="next"' in link:
                        m = re.search('<(.*)>', link)
                        if m:
                            parsed_url = parse.urlparse(m.group(1))
                            # append query in case it was not preserved
                            # (gitlab has that problem)
                            next_query = query
                            next_query.update(
                                dict(parse.parse_qsl(parsed_url.query))
                            )
            if cache:
                with open(payloads_file, 'w') as f:
                    json.dump(payloads, f)
        else:
            with open(payloads_file, 'r') as f:
                payloads = json.load(f)
        return payloads
test_flask_pyoidc.py 文件源码 项目:Flask-pyoidc 作者: zamzterz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_authenticatate_with_extra_request_parameters(self):
        extra_params = {"foo": "bar", "abc": "xyz"}
        authn = OIDCAuthentication(self.app, provider_configuration_info={'issuer': ISSUER},
                                   client_registration_info={'client_id': 'foo'},
                                   extra_request_args=extra_params)

        with self.app.test_request_context('/'):
            a = authn._authenticate()
        request_params = dict(parse_qsl(urlparse(a.location).query))
        assert set(extra_params.items()).issubset(set(request_params.items()))
test_kong_consumer.py 文件源码 项目:ansible-kong-module 作者: toast38coza 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_configure_for_plugin(self):

        expected_url = "{}/consumers/joe/auth-key" . format (mock_kong_admin_url)
        responses.add(responses.POST, expected_url, status=201)

        data = { "key": "123" }
        response = self.api.configure_for_plugin("joe", "auth-key", data)

        assert response.status_code == 201

        body = parse_qs(responses.calls[0].request.body)
        body_exactly = parse_qsl(responses.calls[0].request.body)
        assert body['key'][0] == "123", \
            "Expect correct. data to be sent. Got: {}" . format (body_exactly)
providers.py 文件源码 项目:django-allauth-cas 作者: aureplop 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_auth_params(self, request, action):
        settings = self.get_settings()
        ret = dict(settings.get('AUTH_PARAMS', {}))
        dynamic_auth_params = request.GET.get('auth_params')
        if dynamic_auth_params:
            ret.update(dict(parse_qsl(dynamic_auth_params)))
        return ret
s3_listener.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def expand_redirect_url(starting_url, key, bucket):
    """ Add key and bucket parameters to starting URL query string. """
    parsed = urlparse.urlparse(starting_url)
    query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))
    query.update([('key', key), ('bucket', bucket)])

    redirect_url = urlparse.urlunparse((
        parsed.scheme, parsed.netloc, parsed.path,
        parsed.params, urlparse.urlencode(query), None))

    return redirect_url
deezer.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def request_access_token(self, *args, **kwargs):
        response = self.request(*args, **kwargs)
        return dict(parse_qsl(response.text))
fixture_app.py 文件源码 项目:wsgiprox 作者: webrecorder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __call__(self, env, start_response):
        status = '200 OK'

        params = dict(parse_qsl(env.get('QUERY_STRING')))

        ws = env.get('wsgi.websocket')
        if ws and not params.get('ignore_ws'):
            msg = 'WS Request Url: ' + env.get('REQUEST_URI', '')
            msg += ' Echo: ' + ws.receive()
            ws.send(msg)
            return []

        result = 'Requested Url: ' + env.get('REQUEST_URI', '')
        if env['REQUEST_METHOD'] == 'POST':
            result += ' Post Data: ' + env['wsgi.input'].read(int(env['CONTENT_LENGTH'])).decode('utf-8')

        if params.get('addproxyhost') == 'true':
            result += ' Proxy Host: ' + env.get('wsgiprox.proxy_host', '')

        result = result.encode('iso-8859-1')

        if params.get('chunked') == 'true':
            headers = []
        else:
            headers = [('Content-Length', str(len(result)))]

        write = start_response(status, headers)

        if params.get('write') == 'true':
            write(result)
            return iter([])
        else:
            return iter([result])


# ============================================================================
fake_client.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        self.last_request_id = headers.get('x-openstack-request-id',
                                           'req-test')
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:python-kingbirdclient 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
swift.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def temp_url(self, lifetime=300, method='GET', inline=True, filename=None):
        """Obtains a temporary URL to an object.

        Args:
            lifetime (int): The time (in seconds) the temporary
                URL will be valid
            method (str): The HTTP method that can be used on
                the temporary URL
            inline (bool, default True): If False, URL will have a
                Content-Disposition header that causes browser to download as
                attachment.
            filename (str, optional): A urlencoded filename to use for
                attachment, otherwise defaults to object name
        """
        global_options = settings.get()['swift']
        auth_url = global_options.get('auth_url')
        temp_url_key = global_options.get('temp_url_key')

        if not self.resource:
            raise ValueError('can only create temporary URL on object')
        if not temp_url_key:
            raise ValueError(
                'a temporary url key must be set with settings.update '
                'or by setting the OS_TEMP_URL_KEY environment variable')
        if not auth_url:
            raise ValueError(
                'an auth url must be set with settings.update '
                'or by setting the OS_AUTH_URL environment variable')

        obj_path = '/v1/%s' % self[len(self.drive):]
        # Generate the temp url using swifts helper. Note that this method is ONLY
        # useful for obtaining the temp_url_sig and the temp_url_expires parameters.
        # These parameters will be used to construct a properly-escaped temp url
        obj_url = generate_temp_url(obj_path, lifetime, temp_url_key, method)
        query_begin = obj_url.rfind('temp_url_sig', 0, len(obj_url))
        obj_url_query = obj_url[query_begin:]
        obj_url_query = dict(parse.parse_qsl(obj_url_query))

        query = ['temp_url_sig=%s' % obj_url_query['temp_url_sig'],
                 'temp_url_expires=%s' % obj_url_query['temp_url_expires']]
        if inline:
            query.append('inline')
        if filename:
            query.append('filename=%s' % parse.quote(filename))

        auth_url_parts = parse.urlparse(auth_url)
        return parse.urlunparse((auth_url_parts.scheme,
                                 auth_url_parts.netloc,
                                 parse.quote(obj_path),
                                 auth_url_parts.params,
                                 '&'.join(query),
                                 auth_url_parts.fragment))
__init__.py 文件源码 项目:chalktalk_docs 作者: loremIpsum1771 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def encode_uri(uri):
    split = list(urlsplit(uri))
    split[1] = split[1].encode('idna').decode('ascii')
    split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
    query = list((q, quote_plus(v.encode('utf-8')))
                 for (q, v) in parse_qsl(split[3]))
    split[3] = urlencode(query).decode('ascii')
    return urlunsplit(split)
fake_client.py 文件源码 项目:python-karborclient 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:python-distilclient 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        self.last_request_id = headers.get('x-openstack-request-id',
                                           'req-test')
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
fake_client.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })


问题


面经


文章

微信
公众号

扫码关注公众号