def __call__(self, environ, start_response):
request = Request(environ)
if request.path.startswith(self.prefix):
method = request.path[len(self.prefix):]
if method == '':
# no trailing /
start_response('302', [('location', self.prefix + '/')])
return ''
try:
funcname = self.urlmap[method]
func = getattr(self, funcname)
except (KeyError, AttributeError):
response = Response(status=404)
else:
response = func(request)
return response(environ, start_response)
else:
return self.app(environ, start_response)
python类Request()的实例源码
def wsgi_app(self, environ, start_response):
"""
wsgi ??
??????(cgI????), ????start_response????,
????
????header??start_response????
??????????
"""
# ???wsgi???????
# self is app
# active: _request_ctx_stack.top
_request_ctx_stack.push(_RequestContext(self, environ))
try:
request = Request(environ)
response = self.dispatch_request(request)
return response(environ, start_response)
finally:
_request_ctx_stack.pop()
def __call__(self, environ, start_response):
code = self.code
content = self.content
request = Request(environ)
self.requests.append(request)
data = request.data
if request.content_encoding == 'deflate':
data = zlib.decompress(data)
data = data.decode(request.charset)
if request.content_type == 'application/json':
data = json.loads(data)
self.payloads.append(data)
validator = VALIDATORS.get(request.path, None)
if validator and not self.skip_validate:
try:
validator.validate(data)
code = 202
except jsonschema.ValidationError as e:
code = 400
content = json.dumps({'status': 'error', 'message': str(e)})
response = Response(status=code)
response.headers.clear()
response.headers.extend(self.headers)
response.data = content
return response(environ, start_response)
def test_accept_mixin(self):
request = wrappers.Request({
'HTTP_ACCEPT': 'text/xml,application/xml,application/xhtml+xml,'
'text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5',
'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'HTTP_ACCEPT_ENCODING': 'gzip,deflate',
'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5'
})
self.assert_equal(request.accept_mimetypes, MIMEAccept([
('text/xml', 1), ('image/png', 1), ('application/xml', 1),
('application/xhtml+xml', 1), ('text/html', 0.9),
('text/plain', 0.8), ('*/*', 0.5)
]))
self.assert_strict_equal(request.accept_charsets, CharsetAccept([
('ISO-8859-1', 1), ('utf-8', 0.7), ('*', 0.7)
]))
self.assert_strict_equal(request.accept_encodings, Accept([
('gzip', 1), ('deflate', 1)]))
self.assert_strict_equal(request.accept_languages, LanguageAccept([
('en-us', 1), ('en', 0.5)]))
request = wrappers.Request({'HTTP_ACCEPT': ''})
self.assert_strict_equal(request.accept_mimetypes, MIMEAccept())
def test_etag_request_mixin(self):
request = wrappers.Request({
'HTTP_CACHE_CONTROL': 'no-store, no-cache',
'HTTP_IF_MATCH': 'w/"foo", bar, "baz"',
'HTTP_IF_NONE_MATCH': 'w/"foo", bar, "baz"',
'HTTP_IF_MODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT',
'HTTP_IF_UNMODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT'
})
assert request.cache_control.no_store
assert request.cache_control.no_cache
for etags in request.if_match, request.if_none_match:
assert etags('bar')
assert etags.contains_raw('w/"foo"')
assert etags.contains_weak('foo')
assert not etags.contains('foo')
self.assert_equal(request.if_modified_since, datetime(2008, 1, 22, 11, 18, 44))
self.assert_equal(request.if_unmodified_since, datetime(2008, 1, 22, 11, 18, 44))
def test_user_agent_mixin(self):
user_agents = [
('Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en-US; rv:1.8.1.11) '
'Gecko/20071127 Firefox/2.0.0.11', 'firefox', 'macos', '2.0.0.11',
'en-US'),
('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; de-DE) Opera 8.54',
'opera', 'windows', '8.54', 'de-DE'),
('Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420 '
'(KHTML, like Gecko) Version/3.0 Mobile/1A543a Safari/419.3',
'safari', 'iphone', '419.3', 'en'),
('Bot Googlebot/2.1 ( http://www.googlebot.com/bot.html)',
'google', None, '2.1', None)
]
for ua, browser, platform, version, lang in user_agents:
request = wrappers.Request({'HTTP_USER_AGENT': ua})
self.assert_strict_equal(request.user_agent.browser, browser)
self.assert_strict_equal(request.user_agent.platform, platform)
self.assert_strict_equal(request.user_agent.version, version)
self.assert_strict_equal(request.user_agent.language, lang)
assert bool(request.user_agent)
self.assert_strict_equal(request.user_agent.to_header(), ua)
self.assert_strict_equal(str(request.user_agent), ua)
request = wrappers.Request({'HTTP_USER_AGENT': 'foo'})
assert not request.user_agent
def test_common_request_descriptors_mixin(self):
request = wrappers.Request.from_values(content_type='text/html; charset=utf-8',
content_length='23',
headers={
'Referer': 'http://www.example.com/',
'Date': 'Sat, 28 Feb 2009 19:04:35 GMT',
'Max-Forwards': '10',
'Pragma': 'no-cache',
'Content-Encoding': 'gzip',
'Content-MD5': '9a3bc6dbc47a70db25b84c6e5867a072'
})
self.assert_equal(request.content_type, 'text/html; charset=utf-8')
self.assert_equal(request.mimetype, 'text/html')
self.assert_equal(request.mimetype_params, {'charset': 'utf-8'})
self.assert_equal(request.content_length, 23)
self.assert_equal(request.referrer, 'http://www.example.com/')
self.assert_equal(request.date, datetime(2009, 2, 28, 19, 4, 35))
self.assert_equal(request.max_forwards, 10)
self.assert_true('no-cache' in request.pragma)
self.assert_equal(request.content_encoding, 'gzip')
self.assert_equal(request.content_md5, '9a3bc6dbc47a70db25b84c6e5867a072')
def test_file_closing(self):
data = (b'--foo\r\n'
b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
b'Content-Type: text/plain; charset=utf-8\r\n\r\n'
b'file contents, just the contents\r\n'
b'--foo--')
req = wrappers.Request.from_values(
input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST'
)
foo = req.files['foo']
self.assert_equal(foo.mimetype, 'text/plain')
self.assert_equal(foo.filename, 'foo.txt')
self.assert_equal(foo.closed, False)
req.close()
self.assert_equal(foo.closed, True)
def test_ranges(self):
# basic range stuff
req = wrappers.Request.from_values()
assert req.range is None
req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'})
self.assert_equal(req.range.ranges, [(0, 500)])
resp = wrappers.Response()
resp.content_range = req.range.make_content_range(1000)
self.assert_equal(resp.content_range.units, 'bytes')
self.assert_equal(resp.content_range.start, 0)
self.assert_equal(resp.content_range.stop, 500)
self.assert_equal(resp.content_range.length, 1000)
self.assert_equal(resp.headers['Content-Range'], 'bytes 0-499/1000')
resp.content_range.unset()
assert 'Content-Range' not in resp.headers
resp.headers['Content-Range'] = 'bytes 0-499/1000'
self.assert_equal(resp.content_range.units, 'bytes')
self.assert_equal(resp.content_range.start, 0)
self.assert_equal(resp.content_range.stop, 500)
self.assert_equal(resp.content_range.length, 1000)
def __call__(self, environ, start_response):
req = Request(environ, shallow=True)
host = req.headers.get('Host')
if host not in ('metadata.google.internal', '169.254.169.254' , 'metadata'):
status = '403 Forbidden'
response_headers = [('Content-Type','text/html; charset=UTF-8')]
start_response(status, response_headers)
return ['Host Header nust be one of (metadata.google.internal, metadata, 169.254.169.254)']
req_path = environ.get('PATH_INFO')
metadata_flavor = req.headers.get('Metadata-Flavor')
if (metadata_flavor is None and req_path != '/'):
status = '403 Forbidden'
response_headers = [('Content-Type','text/html; charset=UTF-8')]
start_response(status, response_headers)
logging.error("Metadata-Flavor: Google header not sent for: " + environ.get('PATH_INFO'))
t = Template('''<p>Your client does not have permission to get URL
<code>{{ err_path }}</code> from this server. Missing Metadata-Flavor:Google header. ''')
return [str(t.render(err_path= environ.get('PATH_INFO')))]
return self.app(environ, start_response)
# Make sure every response has these headers (which is what gce does)
def zabbix_fake_app(environ, start_response):
request = Request(environ)
request_body = request.get_data(as_text=True)
if getattr(zabbix_fake_app, 'status', False):
response = Response(status=zabbix_fake_app.status)
response.data = zabbix_fake_app.content
return response(environ, start_response)
response = Response(status=200, headers=[('Content-type', 'application/json')])
if '"method": "user.login"' in request_body:
json_string = '{"jsonrpc":"2.0","result":"9287f336ffb611e586aa5e5517507c66","id":0}'
elif '"method": "host.get"' in request_body:
json_string = open('tests/fixtures/host.get_success.json').read()
elif '"method": "item.get"' in request_body:
json_string = open('tests/fixtures/items.get_success.json').read()
else:
json_string = 'Unrecognized test request'
response.data = json_string
return response(environ, start_response)
def get_response(ip, forwarded=None):
req_dict = {'REMOTE_ADDR': ip}
if forwarded:
req_dict['HTTP_X_FORWARDED_FOR'] = forwarded
return protected(None, Request(req_dict))
def wsgi_app(self, environ, start_response):
request = Request(environ)
self.pre_handle(request)
response = self.dispatch_response(request)
self.post_handle(request, response)
return response(environ, start_response)
def wsgi_app(self, environ, start_response):
request = Request(environ)
response = self.dispatch_request(request)
return response(environ, start_response)
def wsgi_app(self, environ, start_response):
request = Request(environ)
response = self.dispatch_request(request)
return response(environ, start_response)
def wsgi_app(self, environ, start_response):
request = Request(environ)
response = self.dispatch_request(request)
return response(environ, start_response)
def wsgi_app(self, environ, start_response):
route = self.router.bind_to_environ(environ)
try:
endpoint, args = route.match()
except RequestRedirect as e:
return e
except HTTPException:
return NotFound()
request = Request(environ)
args = request.args
response = Response()
response.mimetype = 'text/plain'
response.status_code = 200
if endpoint == 'contestant':
if 'mac' not in args or 'row' not in args or 'col' not in args:
response.status_code = 400
response.data = 'Required query parameters: mac, row, col'
else:
mac = args['mac']
row = args['row']
col = args['col']
self.add_contestant(mac, row, col, response)
elif endpoint == 'worker':
if 'mac' not in args or 'num' not in args:
response.status_code = 400
response.data = 'Required query parameters: mac, num'
else:
mac = args['mac']
num = args['num']
self.add_worker(mac, num, response)
elif endpoint == 'reboot_timestamp':
response.data = str(self.reboot_string)
return response
def wsgi_app(self, environ, start_response):
route = self.router.bind_to_environ(environ)
try:
endpoint, args = route.match()
except RequestRedirect as e:
return e
except HTTPException:
return NotFound()
request = Request(environ)
get_args = dict(request.args)
if endpoint == 'wipe':
get_args['wipe'] = 'pixie_wipe=force'
else:
get_args['wipe'] = ""
response = Response()
response.mimetype = 'text/plain'
response.status_code = 200
config = None
if 'ip' in get_args:
ip_addr = ipaddress.ip_address(get_args['ip'][0])
for cfg in self.configs:
if ip_addr in cfg['subnet']:
config = cfg
if config is None:
response.data = CONFIGSCRIPT.format(**self.default_config)
else:
for (k, v) in config.items():
get_args[k] = v
response.data = BOOTSCRIPT.format(**get_args)
return response
def test_query_string_is_bytes(self):
req = wrappers.Request.from_values(u'/?foo=%2f')
self.assert_strict_equal(req.query_string, b'foo=%2f')
def test_access_route(self):
req = wrappers.Request.from_values(headers={
'X-Forwarded-For': '192.168.1.2, 192.168.1.1'
})
req.environ['REMOTE_ADDR'] = '192.168.1.3'
self.assert_equal(req.access_route, ['192.168.1.2', '192.168.1.1'])
self.assert_strict_equal(req.remote_addr, '192.168.1.3')
req = wrappers.Request.from_values()
req.environ['REMOTE_ADDR'] = '192.168.1.3'
self.assert_strict_equal(list(req.access_route), ['192.168.1.3'])
def test_url_request_descriptors(self):
req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test')
self.assert_strict_equal(req.path, u'/bar')
self.assert_strict_equal(req.full_path, u'/bar?foo=baz')
self.assert_strict_equal(req.script_root, u'/test')
self.assert_strict_equal(req.url, u'http://example.com/test/bar?foo=baz')
self.assert_strict_equal(req.base_url, u'http://example.com/test/bar')
self.assert_strict_equal(req.url_root, u'http://example.com/test/')
self.assert_strict_equal(req.host_url, u'http://example.com/')
self.assert_strict_equal(req.host, 'example.com')
self.assert_strict_equal(req.scheme, 'http')
req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test')
self.assert_strict_equal(req.scheme, 'https')
def test_url_request_descriptors_query_quoting(self):
next = 'http%3A%2F%2Fwww.example.com%2F%3Fnext%3D%2F'
req = wrappers.Request.from_values('/bar?next=' + next, 'http://example.com/')
self.assert_equal(req.path, u'/bar')
self.assert_strict_equal(req.full_path, u'/bar?next=' + next)
self.assert_strict_equal(req.url, u'http://example.com/bar?next=' + next)
def test_authorization_mixin(self):
request = wrappers.Request.from_values(headers={
'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='
})
a = request.authorization
self.assert_strict_equal(a.type, 'basic')
self.assert_strict_equal(a.username, 'Aladdin')
self.assert_strict_equal(a.password, 'open sesame')
def test_stream_wrapping(self):
class LowercasingStream(object):
def __init__(self, stream):
self._stream = stream
def read(self, size=-1):
return self._stream.read(size).lower()
def readline(self, size=-1):
return self._stream.readline(size).lower()
data = b'foo=Hello+World'
req = wrappers.Request.from_values('/', method='POST', data=data,
content_type='application/x-www-form-urlencoded')
req.stream = LowercasingStream(req.stream)
self.assert_equal(req.form['foo'], 'hello world')
def test_get_data_method_parsing_caching_behavior(self):
data = b'foo=Hello+World'
req = wrappers.Request.from_values('/', method='POST', data=data,
content_type='application/x-www-form-urlencoded')
# get_data() caches, so form stays available
self.assert_equal(req.get_data(), data)
self.assert_equal(req.form['foo'], u'Hello World')
self.assert_equal(req.get_data(), data)
# here we access the form data first, caching is bypassed
req = wrappers.Request.from_values('/', method='POST', data=data,
content_type='application/x-www-form-urlencoded')
self.assert_equal(req.form['foo'], u'Hello World')
self.assert_equal(req.get_data(), b'')
# Another case is uncached get data which trashes everything
req = wrappers.Request.from_values('/', method='POST', data=data,
content_type='application/x-www-form-urlencoded')
self.assert_equal(req.get_data(cache=False), data)
self.assert_equal(req.get_data(cache=False), b'')
self.assert_equal(req.form, {})
# Or we can implicitly start the form parser which is similar to
# the old .data behavior
req = wrappers.Request.from_values('/', method='POST', data=data,
content_type='application/x-www-form-urlencoded')
self.assert_equal(req.get_data(parse_form_data=True), b'')
self.assert_equal(req.form['foo'], u'Hello World')
def test_shallow_mode(self):
request = wrappers.Request({'QUERY_STRING': 'foo=bar'}, shallow=True)
self.assert_equal(request.args['foo'], 'bar')
self.assert_raises(RuntimeError, lambda: request.form['foo'])
def test_form_parsing_failed(self):
data = (
b'--blah\r\n'
)
data = wrappers.Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
assert not data.files
assert not data.form
def test_url_charset_reflection(self):
req = wrappers.Request.from_values()
req.charset = 'utf-7'
self.assert_equal(req.url_charset, 'utf-7')
def test_response_iter_wrapping(self):
def uppercasing(iterator):
for item in iterator:
yield item.upper()
def generator():
yield 'foo'
yield 'bar'
req = wrappers.Request.from_values()
resp = wrappers.Response(generator())
del resp.headers['Content-Length']
resp.response = uppercasing(resp.iter_encoded())
actual_resp = wrappers.Response.from_app(resp, req.environ, buffered=True)
self.assertEqual(actual_resp.get_data(), b'FOOBAR')
def test_other_method_payload(self):
data = b'Hello World'
req = wrappers.Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='text/plain',
method='WHAT_THE_FUCK')
self.assert_equal(req.get_data(), data)
self.assert_is_instance(req.stream, LimitedStream)