def test_preserve_last_key_case(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
cid.update({'ACCEPT': 'application/json'})
cid['USER-AGENT'] = 'requests'
keyset = frozenset(['ACCEPT', 'USER-AGENT'])
assert frozenset(i[0] for i in cid.items()) == keyset
assert frozenset(cid.keys()) == keyset
assert frozenset(cid) == keyset
python类CaseInsensitiveDict()的实例源码
def test_copy(self):
cid = CaseInsensitiveDict({
'Accept': 'application/json',
'user-Agent': 'requests',
})
cid_copy = cid.copy()
assert cid == cid_copy
cid['changed'] = True
assert cid != cid_copy
def build_response(self):
request = self.calls[-1].args[0]
r = requests.Response()
try:
r.status_code = int(self.redirects.pop(0))
except IndexError:
r.status_code = 200
r.headers = CaseInsensitiveDict({'Location': '/'})
r.raw = self._build_raw()
r.request = request
return r
def parse_http_equiv_headers(html: str) -> CaseInsensitiveDict:
http_equiv_headers = CaseInsensitiveDict()
# Try to parse the HTML
try:
soup = bs(html, 'html.parser')
except:
return http_equiv_headers
# Find all the meta tags
metas = soup.find_all('meta')
for meta in metas:
if meta.has_attr('http-equiv') and meta.has_attr('content'):
# Add support for multiple CSP policies specified via http-equiv
# See issue: https://github.com/mozilla/http-observatory/issues/266
# Note that this is so far only done for CSP and not for other types
# of http-equiv
if (meta.get('http-equiv', '').lower().strip() == 'content-security-policy' and
'Content-Security-Policy' in http_equiv_headers):
http_equiv_headers['Content-Security-Policy'] += '; ' + meta.get('content')
else:
http_equiv_headers[meta.get('http-equiv')] = meta.get('content')
# Technically not HTTP Equiv, but I'm treating it that way
elif meta.get('name', '').lower().strip() == 'referrer' and meta.has_attr('content'):
http_equiv_headers['Referrer-Policy'] = meta.get('content')
return http_equiv_headers
def __init__(self, consumer_key=None, consumer_secret=None, params=None,
launch_url=None, launch_headers=None):
super(ToolProvider, self).__init__(consumer_key, consumer_secret,
params=params)
self.outcome_requests = []
self._last_outcome_request = None
self.launch_url = launch_url
self.launch_headers = launch_headers or CaseInsensitiveDict()
if 'Content-Type' not in self.launch_headers:
self.launch_headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED
def __init__(
self,
headers,
data=None,
status_code=None,
oauth_token=None,
response=None):
self.headers = CaseInsensitiveDict(headers)
self.data = data
self.status_code = status_code
self.oauth_token = oauth_token
self.response = response
self.cookies = OrderedDict(response.cookies.items())
self.logger = get_logger(__name__)
def make_headers(self, headers=None, cookies=None):
headers = CaseInsensitiveDict(
isinstance(headers, dict) and headers or {})
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
if self.oauth_token:
headers['Authorization'] = 'oauth2: {}'.format(self.oauth_token)
return dict(headers)
def format_event(event, context):
output_context = {
'aws_request_id': context.aws_request_id,
'client_context': context.client_context,
'function_name': context.function_name,
'function_version': context.function_version,
'get_remaining_time_in_millis': context.get_remaining_time_in_millis(),
'invoked_function_arn': context.invoked_function_arn,
'log_group_name': context.log_group_name,
'log_stream_name': context.log_stream_name,
'memory_limit_in_mb': context.memory_limit_in_mb,
}
hashed_event = Hasher(event)
request = {
'requested-at': datetime.datetime.utcnow().isoformat(),
'context': output_context,
'operation': hashed_event['operation'],
'requestor': hashed_event['parameters']['requestor'],
'body': hashed_event['parameters']['request']['body'],
'path': hashed_event['parameters']['request']['path'],
'querystring': hashed_event['parameters']['request']['querystring'],
'header': CaseInsensitiveDict(
hashed_event['parameters']['request']['header']),
'gateway': hashed_event['parameters']['gateway'],
}
return request
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
parsed_url = urlparse.urlparse(request.url)
# We only work for requests with a host of localhost
if parsed_url.netloc.lower() != "localhost":
raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
request.url)
real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
pathname = url_to_path(real_url)
resp = Response()
resp.status_code = 200
resp.url = real_url
stats = os.stat(pathname)
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
resp.headers = CaseInsensitiveDict({
"Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = LocalFSResponse(open(pathname, "rb"))
resp.close = resp.raw.close
return resp
def serialize_patch(obj):
"""Convert objects into JSON structures."""
# Record class and module information for deserialization
result = {'__class__': obj.__class__.__name__}
try:
result['__module__'] = obj.__module__
except AttributeError:
pass
# Convert objects to dictionary representation based on type
if isinstance(obj, datetime.datetime):
result['year'] = obj.year
result['month'] = obj.month
result['day'] = obj.day
result['hour'] = obj.hour
result['minute'] = obj.minute
result['second'] = obj.second
result['microsecond'] = obj.microsecond
return result
if isinstance(obj, StreamingBody):
original_text = obj.read()
# We remove a BOM here if it exists so that it doesn't get reencoded
# later on into a UTF-16 string, presumably by the json library
result['body'] = original_text.decode('utf-8-sig')
obj._raw_stream = BytesIO(original_text)
obj._amount_read = 0
return result
if isinstance(obj, CaseInsensitiveDict):
result['as_dict'] = dict(obj)
return result
raise TypeError('Type not serializable')
def make_response(status_code: int = 200,
content: bytes = b'',
headers: dict = None,
reason: str = None,
encoding: str = None,
) -> Response:
response = Response()
response.status_code = status_code
response._content = content
response._content_consumed = True
response.headers = CaseInsensitiveDict(headers or {})
response.encoding = encoding or get_encoding_from_headers(headers or {})
response.reason = reason
return response
def _guess_mime_type(headers: CaseInsensitiveDict) -> Optional[str]:
location = headers.get('location')
if location:
mime_type, _ = mimetypes.guess_type(location)
if mime_type:
return mime_type
# Parse mime type from content-type header, e.g. 'image/jpeg;charset=US-ASCII' -> 'image/jpeg'
mime_type, _ = cgi.parse_header(headers.get('content-type', ''))
return mime_type or None
def _decode_headers(headers: CaseInsensitiveDict, encoding: str) -> CaseInsensitiveDict:
return CaseInsensitiveDict({
k.decode(encoding): v.decode(encoding)
for k, v in headers.items()
})
def _pod_response_fixture(headers=None):
mock_response = mock.create_autospec(requests.Response)
headers = CaseInsensitiveDict({} if headers is None else headers)
mock_response.headers = headers
return mock_response
def __init__(self):
self.status_code = None
self.url = None
self.orig_url = None
self.headers = CaseInsensitiveDict()
self.content = ''
self.cookies = {}
self.error = None
self.save = None
self.js_script_result = None
self.time = 0
def rebuild_response(r):
response = Response()
response.status_code = r.get('status_code', 599)
response.url = r.get('url', '')
response.orig_url = r.get('orig_url', response.url)
response.headers = CaseInsensitiveDict(r.get('headers', {}))
response.content = r.get('content', '')
response.cookies = r.get('cookies', {})
response.error = r.get('error')
response.time_cost = r.get('time_cost', 0)
response.js_script_result = r.get('js_script_result')
return response
def test_docstring_example(self):
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
assert cid['aCCEPT'] == 'application/json'
assert list(cid) == ['Accept']
def test_len(self):
cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
cid['A'] = 'a'
assert len(cid) == 2
def test_getitem(self):
cid = CaseInsensitiveDict({'Spam': 'blueval'})
assert cid['spam'] == 'blueval'
assert cid['SPAM'] == 'blueval'