def test_data_error_raised():
class Person(Model):
name = StringType(required=True, min_length=4)
class PersonAPI(HTTPEater):
request_cls = Person
response_cls = Person
url = 'http://example.com/person'
api = PersonAPI(name='John')
with pytest.raises(DataError):
with requests_mock.Mocker() as mock:
mock.get(
api.url,
json={'name': 'Joh'},
headers=CaseInsensitiveDict({
'Content-Type': 'application/json'
})
)
api()
python类CaseInsensitiveDict()的实例源码
def test_non_json_content_response():
class GetPersonAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
api = GetPersonAPI()
with requests_mock.Mocker() as mock:
mock.get(
'http://example.com/',
text='Hello world',
headers=CaseInsensitiveDict({
'Content-Type': 'text/plain'
})
)
with pytest.raises(NotImplementedError):
api()
def __init__(self, url, method='get', data=None, params=None, headers=None,
content_type='application/json', app_name=''):
self.url = url
self.method = method
self.params = params or {}
self.result = None
if not isinstance(headers, dict):
headers = {}
self.headers = CaseInsensitiveDict(headers)
self.headers['Content-Type'] = content_type
if data is None:
data = {}
self.data = json.dumps(data)
if 'User-Agent' not in self.headers:
if app_name:
self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
else:
self.headers['User-Agent'] = _USER_AGENT
def test_guess_mime_type():
# Can guess from URL extension
assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.jpg'}))
assert 'image/png' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.png'}))
assert 'application/pdf' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.pdf'}))
# Can guess from content type if extension is missing
assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({
'Location': 'http://cdn.rets.com/1',
'Content-Type': 'image/jpeg',
}))
# Can guess from content type
assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg'}))
assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg;charset=US-ASCII'}))
assert None == _guess_mime_type(CaseInsensitiveDict({'Content-Type': ''}))
assert None == _guess_mime_type(CaseInsensitiveDict())
def handle_response(cls, response_code, response_data):
"""
Handles service response.
Returns the message, status or workflow id, if applicable
"""
client_error = response_code >= 400 and response_code < 500
success = response_code >= 200 and response_code < 300
workflow = CaseInsensitiveDict({"Message":"", "Id": 0, "Status": 0})
if client_error or success:
try:
response_content = json.loads(response_data)
workflow.update(response_content)
except ValueError:
workflow["Message"] = "Invalid response content"
else:
workflow["Message"] = "{0}: Internal server error".format(response_code)
return workflow
def __init__(self, response):
super(SanitizedResponse, self).__init__()
self.status_code = response.status_code
self.encoding = response.encoding
self.raw = response.raw
self.reason = response.reason
self.url = response.url
self.request = response.request
self.connection = response.connection
self._content_consumed = True
self._content = ""
self.cookies = cookiejar_from_dict({})
self.headers = CaseInsensitiveDict()
self.headers['content-length'] = '0'
for header in ('date', 'server'):
if header in response.headers:
self.headers[header] = response.headers[header]
def __init__(self, flow_expiration=60 * 15, flow_cache_size=1000):
"""
Create a new access manager for managing authorizations.
Parameters
----------
flow_expiration: int
The number of seconds to keep pending authorization flows in memory
flow_cache_size: int
The maximum number of pending authorization flows to keep in memory at a time
"""
self.store = None
self.endpoints = CaseInsensitiveDict()
self.state_cache = TTLCache(maxsize=flow_cache_size,
ttl=flow_expiration) # type: Dict[Tuple[str, str], Tuple[Endpoint, str, str]]
self.random = random.SystemRandom()
self.redirect_uri = "http://localhost/oauth2/callback/"
def __init__(self, method, url,
data=None,
params=None,
headers=None,
app_name=''):
self.method = method
self.url = url
self.data = _convert_request_body(data)
self.params = params or {}
if not isinstance(headers, CaseInsensitiveDict):
self.headers = CaseInsensitiveDict(headers)
else:
self.headers = headers
# tell requests not to add 'Accept-Encoding: gzip, deflate' by default
if 'Accept-Encoding' not in self.headers:
self.headers['Accept-Encoding'] = None
if 'User-Agent' not in self.headers:
if app_name:
self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
else:
self.headers['User-Agent'] = _USER_AGENT
def __deserialize_requests_response(self, to_deserialize):
"""
Create and return a Response based on the contents of the given dictionary.
:param to_deserialize: The dictionary to create the Response from.
:return: A Response created by the contents of to_deserialize.
"""
to_return = Response()
to_return.status_code = to_deserialize["status_code"]
to_return.headers = CaseInsensitiveDict(to_deserialize["headers"])
to_return.encoding = to_deserialize["encoding"]
to_return._content = to_deserialize["content"]
to_return._content_consumed = True
to_return.reason = to_deserialize["reason"]
to_return.url = to_deserialize["url"]
to_return.request = self.decode(to_deserialize["request"])
return to_return
def prepare_response(self, cached):
"""Verify our vary headers match and construct a real urllib3
HTTPResponse object.
"""
# Special case the '*' Vary value as it means we cannot actually
# determine if the cached response is suitable for this request.
if "*" in cached.get("vary", {}):
return
body_raw = cached["response"].pop("body")
headers = CaseInsensitiveDict(data=cached['response']['headers'])
if headers.get('transfer-encoding', '') == 'chunked':
headers.pop('transfer-encoding')
cached['response']['headers'] = headers
try:
body = io.BytesIO(body_raw)
except TypeError:
# This can happen if cachecontrol serialized to v1 format (pickle)
# using Python 2. A Python 2 str(byte string) will be unpickled as
# a Python 3 str (unicode string), which will cause the above to
# fail with:
#
# TypeError: 'str' does not support the buffer interface
body = io.BytesIO(body_raw.encode('utf8'))
return HTTPResponse(
body=body,
preload_content=False,
**cached["response"]
)
def prepare_request(self, request):
cookies = request.cookies or {}
# Bootstrap CookieJar.
if not isinstance(cookies, cookielib.CookieJar):
cookies = cookiejar_from_dict(cookies)
# Merge with session cookies
merged_cookies = merge_cookies(
merge_cookies(RequestsCookieJar(), self.cookies), cookies)
# Set environment's basic authentication if not explicitly set.
# auth = request.auth
# if self.trust_env and not auth and not self.auth:
# auth = get_netrc_auth(request.url)
p = PreparedRequest()
p.prepare(
method=request.method.upper(),
url=request.url,
files=request.files,
data=request.data,
json=request.json,
headers=merge_setting(request.headers,
self.headers, dict_class=CaseInsensitiveDict),
params=merge_setting(request.params, self.params),
auth=merge_setting(request.auth, self.auth),
cookies=merged_cookies,
hooks=merge_hooks(request.hooks, self.hooks),
)
return p
def default_headers():
"""
:rtype: requests.structures.CaseInsensitiveDict
"""
return CaseInsensitiveDict({
'User-Agent': default_user_agent(),
'Accept-Encoding': ', '.join(('gzip', 'deflate')),
'Accept': '*/*',
# 'Connection': 'keep-alive',
'Connection': 'close',
})
def _parse_items(self):
"""Parse `args.items` into `args.headers`, `args.data`, `args.params`,
and `args.files`.
"""
self.args.headers = CaseInsensitiveDict()
self.args.data = ParamDict() if self.args.form else OrderedDict()
self.args.files = OrderedDict()
self.args.params = ParamDict()
try:
parse_items(items=self.args.items,
headers=self.args.headers,
data=self.args.data,
files=self.args.files,
params=self.args.params)
except ParseError as e:
if self.args.traceback:
raise
self.error(e.message)
if self.args.files and not self.args.form:
# `http url @/path/to/file`
file_fields = list(self.args.files.keys())
if file_fields != ['']:
self.error(
'Invalid file fields (perhaps you meant --form?): %s'
% ','.join(file_fields))
fn, fd = self.args.files['']
self.args.files = {}
self._body_from_file(fd)
if 'Content-Type' not in self.args.headers:
mime, encoding = mimetypes.guess_type(fn, strict=False)
if mime:
content_type = mime
if encoding:
content_type = '%s; charset=%s' % (mime, encoding)
self.args.headers['Content-Type'] = content_type
def __init__(self, data=None, tfs=None):
super().__init__(data, tfs)
self._fields = CaseInsensitiveDict(self._data['fields'])
self._system_prefix = 'System.'
self.id = self._data['id']
def __init__(self, content, encoding):
self.encoding = encoding
headers = {}
# Split into header section (if any) and the content
if b'\r\n\r\n' in content:
first, self.content = _split_on_find(content, b'\r\n\r\n')
if first != b'':
headers = _header_parser(first.lstrip(), encoding)
else:
raise ImproperBodyPartContentException(
'content does not contain CR-LF-CR-LF'
)
self.headers = CaseInsensitiveDict(headers)
def __init__(self, status_code=None, url=None, orig_url=None, headers=CaseInsensitiveDict(),
content='', cookies=None, error=None, traceback=None, save=None, js_script_result=None, time=0):
if cookies is None:
cookies = {}
self.status_code = status_code
self.url = url
self.orig_url = orig_url
self.headers = headers
self.content = content
self.cookies = cookies
self.error = error
self.traceback = traceback
self.save = save
self.js_script_result = js_script_result
self.time = time
def rebuild_response(r):
response = Response(
status_code=r.get('status_code', 599),
url=r.get('url', ''),
headers=CaseInsensitiveDict(r.get('headers', {})),
content=r.get('content', ''),
cookies=r.get('cookies', {}),
error=r.get('error'),
traceback=r.get('traceback'),
time=r.get('time', 0),
orig_url=r.get('orig_url', r.get('url', '')),
js_script_result=r.get('js_script_result'),
save=r.get('save'),
)
return response
def __init__(self, content, encoding):
self.encoding = encoding
headers = {}
# Split into header section (if any) and the content
if b'\r\n\r\n' in content:
first, self.content = _split_on_find(content, b'\r\n\r\n')
if first != b'':
headers = _header_parser(first.lstrip(), encoding)
else:
raise ImproperBodyPartContentException(
'content does not contain CR-LF-CR-LF'
)
self.headers = CaseInsensitiveDict(headers)
def test_docstring_example(self):
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
assert cid['aCCEPT'] == 'application/json'
assert list(cid) == ['Accept']
def test_len(self):
cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
cid['A'] = 'a'
assert len(cid) == 2
def test_getitem(self):
cid = CaseInsensitiveDict({'Spam': 'blueval'})
assert cid['spam'] == 'blueval'
assert cid['SPAM'] == 'blueval'
def test_fixes_649(self):
"""__setitem__ should behave case-insensitively."""
cid = CaseInsensitiveDict()
cid['spam'] = 'oneval'
cid['Spam'] = 'twoval'
cid['sPAM'] = 'redval'
cid['SPAM'] = 'blueval'
assert cid['spam'] == 'blueval'
assert cid['SPAM'] == 'blueval'
assert list(cid.keys()) == ['SPAM']
def test_delitem(self):
cid = CaseInsensitiveDict()
cid['Spam'] = 'someval'
del cid['sPam']
assert 'spam' not in cid
assert len(cid) == 0
def test_get(self):
cid = CaseInsensitiveDict()
cid['spam'] = 'oneval'
cid['SPAM'] = 'blueval'
assert cid.get('spam') == 'blueval'
assert cid.get('SPAM') == 'blueval'
assert cid.get('sPam') == 'blueval'
assert cid.get('notspam', 'default') == 'default'
def test_update(self):
cid = CaseInsensitiveDict()
cid['spam'] = 'blueval'
cid.update({'sPam': 'notblueval'})
assert cid['spam'] == 'notblueval'
cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
assert len(cid) == 2
assert cid['foo'] == 'anotherfoo'
assert cid['bar'] == 'anotherbar'
def test_update_retains_unchanged(self):
cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
cid.update({'foo': 'newfoo'})
assert cid['bar'] == 'bar'
def test_iter(self):
cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
keys = frozenset(['Spam', 'Eggs'])
assert frozenset(iter(cid)) == keys
def test_equality(self):
cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
assert cid == othercid
del othercid['spam']
assert cid != othercid
assert cid == {'spam': 'blueval', 'eggs': 'redval'}
assert cid != object()
def test_lower_items(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
lowerkeyset = frozenset(['accept', 'user-agent'])
assert keyset == lowerkeyset
def test_preserve_key_case(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
keyset = frozenset(['Accept', 'user-Agent'])
assert frozenset(i[0] for i in cid.items()) == keyset
assert frozenset(cid.keys()) == keyset
assert frozenset(cid) == keyset