def check_start_response(self, status, headers, exc_info):
check_string('status', status)
status_code = status.split(None, 1)[0]
if len(status_code) != 3 or not status_code.isdigit():
warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
if len(status) < 4 or status[3] != ' ':
warn(WSGIWarning('Invalid value for status %r. Valid '
'status strings are three digits, a space '
'and a status explanation'), stacklevel=3)
status_code = int(status_code)
if status_code < 100:
warn(WSGIWarning('status code < 100 detected'), stacklevel=3)
if type(headers) is not list:
warn(WSGIWarning('header list is not a list'), stacklevel=3)
for item in headers:
if type(item) is not tuple or len(item) != 2:
warn(WSGIWarning('Headers must tuple 2-item tuples'),
stacklevel=3)
name, value = item
if type(name) is not str or type(value) is not str:
warn(WSGIWarning('header items must be strings'),
stacklevel=3)
if name.lower() == 'status':
warn(WSGIWarning('The status header is not supported due to '
'conflicts with the CGI spec.'),
stacklevel=3)
if exc_info is not None and not isinstance(exc_info, tuple):
warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)
headers = Headers(headers)
self.check_headers(headers)
return status_code, headers
python类Headers()的实例源码
def parse_multipart_headers(iterable):
"""Parses multipart headers from an iterable that yields lines (including
the trailing newline symbol). The iterable has to be newline terminated.
The iterable will stop at the line where the headers ended so it can be
further consumed.
:param iterable: iterable of strings that are newline terminated
"""
result = []
for line in iterable:
line = to_native(line)
line, line_terminated = _line_parse(line)
if not line_terminated:
raise ValueError('unexpected end of line in multipart header')
if not line:
break
elif line[0] in ' \t' and result:
key, value = result[-1]
result[-1] = (key, value + '\n ' + line[1:])
else:
parts = line.split(':', 1)
if len(parts) == 2:
result.append((parts[0].strip(), parts[1].strip()))
# we link the list to the headers, no need to create a copy, the
# list was not shared anyways.
return Headers(result)
def run_fixed(self, environ, start_response):
def fixing_start_response(status, headers, exc_info=None):
headers = Headers(headers)
self.fix_headers(environ, headers, status)
return start_response(status, headers.to_wsgi_list(), exc_info)
return self.app(environ, fixing_start_response)
def check_start_response(self, status, headers, exc_info):
check_string('status', status)
status_code = status.split(None, 1)[0]
if len(status_code) != 3 or not status_code.isdigit():
warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
if len(status) < 4 or status[3] != ' ':
warn(WSGIWarning('Invalid value for status %r. Valid '
'status strings are three digits, a space '
'and a status explanation'), stacklevel=3)
status_code = int(status_code)
if status_code < 100:
warn(WSGIWarning('status code < 100 detected'), stacklevel=3)
if type(headers) is not list:
warn(WSGIWarning('header list is not a list'), stacklevel=3)
for item in headers:
if type(item) is not tuple or len(item) != 2:
warn(WSGIWarning('Headers must tuple 2-item tuples'),
stacklevel=3)
name, value = item
if type(name) is not str or type(value) is not str:
warn(WSGIWarning('header items must be strings'),
stacklevel=3)
if name.lower() == 'status':
warn(WSGIWarning('The status header is not supported due to '
'conflicts with the CGI spec.'),
stacklevel=3)
if exc_info is not None and not isinstance(exc_info, tuple):
warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)
headers = Headers(headers)
self.check_headers(headers)
return status_code, headers
def parse_multipart_headers(iterable):
"""Parses multipart headers from an iterable that yields lines (including
the trailing newline symbol). The iterable has to be newline terminated.
The iterable will stop at the line where the headers ended so it can be
further consumed.
:param iterable: iterable of strings that are newline terminated
"""
result = []
for line in iterable:
line = to_native(line)
line, line_terminated = _line_parse(line)
if not line_terminated:
raise ValueError('unexpected end of line in multipart header')
if not line:
break
elif line[0] in ' \t' and result:
key, value = result[-1]
result[-1] = (key, value + '\n ' + line[1:])
else:
parts = line.split(':', 1)
if len(parts) == 2:
result.append((parts[0].strip(), parts[1].strip()))
# we link the list to the headers, no need to create a copy, the
# list was not shared anyways.
return Headers(result)
def run_fixed(self, environ, start_response):
def fixing_start_response(status, headers, exc_info=None):
headers = Headers(headers)
self.fix_headers(environ, headers, status)
return start_response(status, headers.to_wsgi_list(), exc_info)
return self.app(environ, fixing_start_response)
def check_start_response(self, status, headers, exc_info):
check_string('status', status)
status_code = status.split(None, 1)[0]
if len(status_code) != 3 or not status_code.isdigit():
warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
if len(status) < 4 or status[3] != ' ':
warn(WSGIWarning('Invalid value for status %r. Valid '
'status strings are three digits, a space '
'and a status explanation'), stacklevel=3)
status_code = int(status_code)
if status_code < 100:
warn(WSGIWarning('status code < 100 detected'), stacklevel=3)
if type(headers) is not list:
warn(WSGIWarning('header list is not a list'), stacklevel=3)
for item in headers:
if type(item) is not tuple or len(item) != 2:
warn(WSGIWarning('Headers must tuple 2-item tuples'),
stacklevel=3)
name, value = item
if type(name) is not str or type(value) is not str:
warn(WSGIWarning('header items must be strings'),
stacklevel=3)
if name.lower() == 'status':
warn(WSGIWarning('The status header is not supported due to '
'conflicts with the CGI spec.'),
stacklevel=3)
if exc_info is not None and not isinstance(exc_info, tuple):
warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)
headers = Headers(headers)
self.check_headers(headers)
return status_code, headers
def parse_multipart_headers(iterable):
"""Parses multipart headers from an iterable that yields lines (including
the trailing newline symbol). The iterable has to be newline terminated.
The iterable will stop at the line where the headers ended so it can be
further consumed.
:param iterable: iterable of strings that are newline terminated
"""
result = []
for line in iterable:
line = to_native(line)
line, line_terminated = _line_parse(line)
if not line_terminated:
raise ValueError('unexpected end of line in multipart header')
if not line:
break
elif line[0] in ' \t' and result:
key, value = result[-1]
result[-1] = (key, value + '\n ' + line[1:])
else:
parts = line.split(':', 1)
if len(parts) == 2:
result.append((parts[0].strip(), parts[1].strip()))
# we link the list to the headers, no need to create a copy, the
# list was not shared anyways.
return Headers(result)
def run_fixed(self, environ, start_response):
def fixing_start_response(status, headers, exc_info=None):
headers = Headers(headers)
self.fix_headers(environ, headers, status)
return start_response(status, headers.to_wsgi_list(), exc_info)
return self.app(environ, fixing_start_response)
def check_start_response(self, status, headers, exc_info):
check_string('status', status)
status_code = status.split(None, 1)[0]
if len(status_code) != 3 or not status_code.isdigit():
warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
if len(status) < 4 or status[3] != ' ':
warn(WSGIWarning('Invalid value for status %r. Valid '
'status strings are three digits, a space '
'and a status explanation'), stacklevel=3)
status_code = int(status_code)
if status_code < 100:
warn(WSGIWarning('status code < 100 detected'), stacklevel=3)
if type(headers) is not list:
warn(WSGIWarning('header list is not a list'), stacklevel=3)
for item in headers:
if type(item) is not tuple or len(item) != 2:
warn(WSGIWarning('Headers must tuple 2-item tuples'),
stacklevel=3)
name, value = item
if type(name) is not str or type(value) is not str:
warn(WSGIWarning('header items must be strings'),
stacklevel=3)
if name.lower() == 'status':
warn(WSGIWarning('The status header is not supported due to '
'conflicts with the CGI spec.'),
stacklevel=3)
if exc_info is not None and not isinstance(exc_info, tuple):
warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)
headers = Headers(headers)
self.check_headers(headers)
return status_code, headers
def test_slicing(self):
# there's nothing wrong with these being native strings
# Headers doesn't care about the data types
h = self.storage_class()
h.set('X-Foo-Poo', 'bleh')
h.set('Content-Type', 'application/whocares')
h.set('X-Forwarded-For', '192.168.0.123')
h[:] = [(k, v) for k, v in h if k.startswith(u'X-')]
self.assert_equal(list(h), [
('X-Foo-Poo', 'bleh'),
('X-Forwarded-For', '192.168.0.123')
])
def test_remove_entity_headers(self):
now = http.http_date()
headers1 = [('Date', now), ('Content-Type', 'text/html'), ('Content-Length', '0')]
headers2 = datastructures.Headers(headers1)
http.remove_entity_headers(headers1)
assert headers1 == [('Date', now)]
http.remove_entity_headers(headers2)
self.assert_equal(headers2, datastructures.Headers([(u'Date', now)]))
def test_remove_hop_by_hop_headers(self):
headers1 = [('Connection', 'closed'), ('Foo', 'bar'),
('Keep-Alive', 'wtf')]
headers2 = datastructures.Headers(headers1)
http.remove_hop_by_hop_headers(headers1)
assert headers1 == [('Foo', 'bar')]
http.remove_hop_by_hop_headers(headers2)
assert headers2 == datastructures.Headers([('Foo', 'bar')])
def test_cookie_unicode_dumping(self):
val = http.dump_cookie('foo', u'\N{SNOWMAN}')
h = datastructures.Headers()
h.add('Set-Cookie', val)
self.assert_equal(h['Set-Cookie'], 'foo="\\342\\230\\203"; Path=/')
cookies = http.parse_cookie(h['Set-Cookie'])
self.assert_equal(cookies['foo'], u'\N{SNOWMAN}')