python类CaseInsensitiveDict()的实例源码

test_http.py 文件源码 项目:eater 作者: alexhayes 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_data_error_raised():
    class Person(Model):
        name = StringType(required=True, min_length=4)

    class PersonAPI(HTTPEater):
        request_cls = Person
        response_cls = Person
        url = 'http://example.com/person'

    api = PersonAPI(name='John')

    with pytest.raises(DataError):
        with requests_mock.Mocker() as mock:
            mock.get(
                api.url,
                json={'name': 'Joh'},
                headers=CaseInsensitiveDict({
                    'Content-Type': 'application/json'
                })
            )
            api()
test_http.py 文件源码 项目:eater 作者: alexhayes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_non_json_content_response():
    class GetPersonAPI(HTTPEater):
        request_cls = Model
        response_cls = Model
        url = 'http://example.com/'

    api = GetPersonAPI()

    with requests_mock.Mocker() as mock:
        mock.get(
            'http://example.com/',
            text='Hello world',
            headers=CaseInsensitiveDict({
                'Content-Type': 'text/plain'
            })
        )
        with pytest.raises(NotImplementedError):
            api()
service.py 文件源码 项目:jumpserver-python-sdk 作者: jumpserver 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, url, method='get', data=None, params=None, headers=None,
                 content_type='application/json', app_name=''):
        self.url = url
        self.method = method
        self.params = params or {}
        self.result = None

        if not isinstance(headers, dict):
            headers = {}
        self.headers = CaseInsensitiveDict(headers)

        self.headers['Content-Type'] = content_type
        if data is None:
            data = {}
        self.data = json.dumps(data)

        if 'User-Agent' not in self.headers:
            if app_name:
                self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
            else:
                self.headers['User-Agent'] = _USER_AGENT
parse_object_test.py 文件源码 项目:rets 作者: opendoor-labs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_guess_mime_type():
    # Can guess from URL extension
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.jpg'}))
    assert 'image/png' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.png'}))
    assert 'application/pdf' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.pdf'}))

    # Can guess from content type if extension is missing
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({
        'Location': 'http://cdn.rets.com/1',
        'Content-Type': 'image/jpeg',
    }))

    # Can guess from content type
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg'}))
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg;charset=US-ASCII'}))

    assert None == _guess_mime_type(CaseInsensitiveDict({'Content-Type': ''}))
    assert None == _guess_mime_type(CaseInsensitiveDict())
malibuservice.py 文件源码 项目:msgen 作者: MicrosoftGenomics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_response(cls, response_code, response_data):
        """
        Handles service response.
        Returns the message, status or workflow id, if applicable
        """

        client_error = response_code >= 400 and response_code < 500
        success = response_code >= 200 and response_code < 300

        workflow = CaseInsensitiveDict({"Message":"", "Id": 0, "Status": 0})

        if client_error or success:
            try:
                response_content = json.loads(response_data)
                workflow.update(response_content)
            except ValueError:
                workflow["Message"] = "Invalid response content"
        else:
            workflow["Message"] = "{0}: Internal server error".format(response_code)

        return workflow
kerberos_.py 文件源码 项目:deb-python-requests-kerberos 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, response):
        super(SanitizedResponse, self).__init__()
        self.status_code = response.status_code
        self.encoding = response.encoding
        self.raw = response.raw
        self.reason = response.reason
        self.url = response.url
        self.request = response.request
        self.connection = response.connection
        self._content_consumed = True

        self._content = ""
        self.cookies = cookiejar_from_dict({})
        self.headers = CaseInsensitiveDict()
        self.headers['content-length'] = '0'
        for header in ('date', 'server'):
            if header in response.headers:
                self.headers[header] = response.headers[header]
manager.py 文件源码 项目:plumeria 作者: sk89q 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, flow_expiration=60 * 15, flow_cache_size=1000):
        """
        Create a new access manager for managing authorizations.

        Parameters
        ----------
        flow_expiration: int
            The number of seconds to keep pending authorization flows in memory
        flow_cache_size: int
            The maximum number of pending authorization flows to keep in memory at a time

        """
        self.store = None
        self.endpoints = CaseInsensitiveDict()
        self.state_cache = TTLCache(maxsize=flow_cache_size,
                                    ttl=flow_expiration)  # type: Dict[Tuple[str, str], Tuple[Endpoint, str, str]]
        self.random = random.SystemRandom()
        self.redirect_uri = "http://localhost/oauth2/callback/"
http.py 文件源码 项目:TornadoWeb 作者: VxCoder 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, method, url,
                 data=None,
                 params=None,
                 headers=None,
                 app_name=''):
        self.method = method
        self.url = url
        self.data = _convert_request_body(data)
        self.params = params or {}

        if not isinstance(headers, CaseInsensitiveDict):
            self.headers = CaseInsensitiveDict(headers)
        else:
            self.headers = headers

        # tell requests not to add 'Accept-Encoding: gzip, deflate' by default
        if 'Accept-Encoding' not in self.headers:
            self.headers['Accept-Encoding'] = None

        if 'User-Agent' not in self.headers:
            if app_name:
                self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
            else:
                self.headers['User-Agent'] = _USER_AGENT
serializer.py 文件源码 项目:ws-backend-community 作者: lavalamp- 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __deserialize_requests_response(self, to_deserialize):
        """
        Create and return a Response based on the contents of the given dictionary.
        :param to_deserialize: The dictionary to create the Response from.
        :return: A Response created by the contents of to_deserialize.
        """
        to_return = Response()
        to_return.status_code = to_deserialize["status_code"]
        to_return.headers = CaseInsensitiveDict(to_deserialize["headers"])
        to_return.encoding = to_deserialize["encoding"]
        to_return._content = to_deserialize["content"]
        to_return._content_consumed = True
        to_return.reason = to_deserialize["reason"]
        to_return.url = to_deserialize["url"]
        to_return.request = self.decode(to_deserialize["request"])
        return to_return
serialize.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def prepare_response(self, cached):
        """Verify our vary headers match and construct a real urllib3
        HTTPResponse object.
        """
        # Special case the '*' Vary value as it means we cannot actually
        # determine if the cached response is suitable for this request.
        if "*" in cached.get("vary", {}):
            return

        body_raw = cached["response"].pop("body")

        headers = CaseInsensitiveDict(data=cached['response']['headers'])
        if headers.get('transfer-encoding', '') == 'chunked':
            headers.pop('transfer-encoding')

        cached['response']['headers'] = headers

        try:
            body = io.BytesIO(body_raw)
        except TypeError:
            # This can happen if cachecontrol serialized to v1 format (pickle)
            # using Python 2. A Python 2 str(byte string) will be unpickled as
            # a Python 3 str (unicode string), which will cause the above to
            # fail with:
            #
            #     TypeError: 'str' does not support the buffer interface
            body = io.BytesIO(body_raw.encode('utf8'))

        return HTTPResponse(
            body=body,
            preload_content=False,
            **cached["response"]
        )
sessions.py 文件源码 项目:trip 作者: littlecodersh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def prepare_request(self, request):
        cookies = request.cookies or {}

        # Bootstrap CookieJar.
        if not isinstance(cookies, cookielib.CookieJar):
            cookies = cookiejar_from_dict(cookies)

        # Merge with session cookies
        merged_cookies = merge_cookies(
            merge_cookies(RequestsCookieJar(), self.cookies), cookies)

        # Set environment's basic authentication if not explicitly set.
        # auth = request.auth
        # if self.trust_env and not auth and not self.auth:
        #     auth = get_netrc_auth(request.url)

        p = PreparedRequest()
        p.prepare(
            method=request.method.upper(),
            url=request.url,
            files=request.files,
            data=request.data,
            json=request.json,
            headers=merge_setting(request.headers,
                self.headers, dict_class=CaseInsensitiveDict),
            params=merge_setting(request.params, self.params),
            auth=merge_setting(request.auth, self.auth),
            cookies=merged_cookies,
            hooks=merge_hooks(request.hooks, self.hooks),
        )
        return p
utils.py 文件源码 项目:trip 作者: littlecodersh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def default_headers():
    """
    :rtype: requests.structures.CaseInsensitiveDict
    """
    return CaseInsensitiveDict({
        'User-Agent': default_user_agent(),
        'Accept-Encoding': ', '.join(('gzip', 'deflate')),
        'Accept': '*/*',
        # 'Connection': 'keep-alive',
        'Connection': 'close',
    })
input.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _parse_items(self):
        """Parse `args.items` into `args.headers`, `args.data`, `args.params`,
         and `args.files`.

        """
        self.args.headers = CaseInsensitiveDict()
        self.args.data = ParamDict() if self.args.form else OrderedDict()
        self.args.files = OrderedDict()
        self.args.params = ParamDict()

        try:
            parse_items(items=self.args.items,
                        headers=self.args.headers,
                        data=self.args.data,
                        files=self.args.files,
                        params=self.args.params)
        except ParseError as e:
            if self.args.traceback:
                raise
            self.error(e.message)

        if self.args.files and not self.args.form:
            # `http url @/path/to/file`
            file_fields = list(self.args.files.keys())
            if file_fields != ['']:
                self.error(
                    'Invalid file fields (perhaps you meant --form?): %s'
                    % ','.join(file_fields))

            fn, fd = self.args.files['']
            self.args.files = {}

            self._body_from_file(fd)

            if 'Content-Type' not in self.args.headers:
                mime, encoding = mimetypes.guess_type(fn, strict=False)
                if mime:
                    content_type = mime
                    if encoding:
                        content_type = '%s; charset=%s' % (mime, encoding)
                    self.args.headers['Content-Type'] = content_type
resources.py 文件源码 项目:tfs 作者: devopshq 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, data=None, tfs=None):
        super().__init__(data, tfs)
        self._fields = CaseInsensitiveDict(self._data['fields'])
        self._system_prefix = 'System.'
        self.id = self._data['id']
decoder.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, content, encoding):
        self.encoding = encoding
        headers = {}
        # Split into header section (if any) and the content
        if b'\r\n\r\n' in content:
            first, self.content = _split_on_find(content, b'\r\n\r\n')
            if first != b'':
                headers = _header_parser(first.lstrip(), encoding)
        else:
            raise ImproperBodyPartContentException(
                'content does not contain CR-LF-CR-LF'
            )
        self.headers = CaseInsensitiveDict(headers)
response.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, status_code=None, url=None, orig_url=None, headers=CaseInsensitiveDict(),
                 content='', cookies=None, error=None, traceback=None, save=None, js_script_result=None, time=0):
        if cookies is None:
            cookies = {}
        self.status_code = status_code
        self.url = url
        self.orig_url = orig_url
        self.headers = headers
        self.content = content
        self.cookies = cookies
        self.error = error
        self.traceback = traceback
        self.save = save
        self.js_script_result = js_script_result
        self.time = time
response.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def rebuild_response(r):
    response = Response(
        status_code=r.get('status_code', 599),
        url=r.get('url', ''),
        headers=CaseInsensitiveDict(r.get('headers', {})),
        content=r.get('content', ''),
        cookies=r.get('cookies', {}),
        error=r.get('error'),
        traceback=r.get('traceback'),
        time=r.get('time', 0),
        orig_url=r.get('orig_url', r.get('url', '')),
        js_script_result=r.get('js_script_result'),
        save=r.get('save'),
    )
    return response
decoder.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, content, encoding):
        self.encoding = encoding
        headers = {}
        # Split into header section (if any) and the content
        if b'\r\n\r\n' in content:
            first, self.content = _split_on_find(content, b'\r\n\r\n')
            if first != b'':
                headers = _header_parser(first.lstrip(), encoding)
        else:
            raise ImproperBodyPartContentException(
                'content does not contain CR-LF-CR-LF'
            )
        self.headers = CaseInsensitiveDict(headers)
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_docstring_example(self):
        cid = CaseInsensitiveDict()
        cid['Accept'] = 'application/json'
        assert cid['aCCEPT'] == 'application/json'
        assert list(cid) == ['Accept']
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_len(self):
        cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
        cid['A'] = 'a'
        assert len(cid) == 2
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_getitem(self):
        cid = CaseInsensitiveDict({'Spam': 'blueval'})
        assert cid['spam'] == 'blueval'
        assert cid['SPAM'] == 'blueval'
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_fixes_649(self):
        """__setitem__ should behave case-insensitively."""
        cid = CaseInsensitiveDict()
        cid['spam'] = 'oneval'
        cid['Spam'] = 'twoval'
        cid['sPAM'] = 'redval'
        cid['SPAM'] = 'blueval'
        assert cid['spam'] == 'blueval'
        assert cid['SPAM'] == 'blueval'
        assert list(cid.keys()) == ['SPAM']
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def test_delitem(self):
        cid = CaseInsensitiveDict()
        cid['Spam'] = 'someval'
        del cid['sPam']
        assert 'spam' not in cid
        assert len(cid) == 0
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_get(self):
        cid = CaseInsensitiveDict()
        cid['spam'] = 'oneval'
        cid['SPAM'] = 'blueval'
        assert cid.get('spam') == 'blueval'
        assert cid.get('SPAM') == 'blueval'
        assert cid.get('sPam') == 'blueval'
        assert cid.get('notspam', 'default') == 'default'
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_update(self):
        cid = CaseInsensitiveDict()
        cid['spam'] = 'blueval'
        cid.update({'sPam': 'notblueval'})
        assert cid['spam'] == 'notblueval'
        cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
        cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
        assert len(cid) == 2
        assert cid['foo'] == 'anotherfoo'
        assert cid['bar'] == 'anotherbar'
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_update_retains_unchanged(self):
        cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
        cid.update({'foo': 'newfoo'})
        assert cid['bar'] == 'bar'
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_iter(self):
        cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
        keys = frozenset(['Spam', 'Eggs'])
        assert frozenset(iter(cid)) == keys
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_equality(self):
        cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
        othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
        assert cid == othercid
        del othercid['spam']
        assert cid != othercid
        assert cid == {'spam': 'blueval', 'eggs': 'redval'}
        assert cid != object()
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_lower_items(self):
        cid = CaseInsensitiveDict({
            'Accept': 'application/json',
            'user-Agent': 'requests',
        })
        keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
        lowerkeyset = frozenset(['accept', 'user-agent'])
        assert keyset == lowerkeyset
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_preserve_key_case(self):
        cid = CaseInsensitiveDict({
            'Accept': 'application/json',
            'user-Agent': 'requests',
        })
        keyset = frozenset(['Accept', 'user-Agent'])
        assert frozenset(i[0] for i in cid.items()) == keyset
        assert frozenset(cid.keys()) == keyset
        assert frozenset(cid) == keyset


问题


面经


文章

微信
公众号

扫码关注公众号