def _error(error, code):
"""Return an error as json"""
_error = {
"status": str(code),
"title": errors[code],
"detail": error.body
}
traceback.print_exception(type(error), error, error.traceback)
response.headers["Content-Type"] = "application/vnd.api+json"
return response_object(errors=[_error])
python类headers()的实例源码
def get_resources():
"""Return the resources of the authenticated user.
If authentication failed, this aborts the execution with
401 Unauthorized.
"""
#pprint(dict(request.headers))
header = request.environ.get('HTTP_AUTHORIZATION','')
if header:
print("Authorization:", header)
basic = request.auth
if basic:
username, password = basic
if passwords.get(username) != password:
abort(401, BASIC_ERROR)
else:
api_key = get_api_key()
if api_key is not None:
username = api_keys.get(api_key)
if username is None:
abort(401, API_KEY_ERROR)
else:
username = None
return _resources[username]
def assertRedirect(self, target, result, query=None, status=303, **args):
env = {'SERVER_PROTOCOL':'HTTP/1.1'}
for key in list(args):
if key.startswith('wsgi'):
args[key.replace('_', '.', 1)] = args[key]
del args[key]
env.update(args)
request.bind(env)
bottle.response.bind()
try:
bottle.redirect(target, **(query or {}))
except bottle.HTTPResponse:
r = _e()
self.assertEqual(status, r.status_code)
self.assertTrue(r.headers)
self.assertEqual(result, r.headers['Location'])
def echo():
try:
body = request.body.read().decode('utf-8')
except:
body = None
result = {
'method': request.method,
'headers': dict(request.headers),
'body': body,
'files': [
{'key': key, 'name': request.files[key].raw_filename}
for key in request.files
]
}
return result
def __call__(self, *args, **kwargs):
headers = {}
if self._access_token:
headers['Authorization'] = 'Token ' + self._access_token
resp = requests.post(
self._url, json=kwargs,
headers=headers
)
if resp.ok:
data = resp.json()
if data.get('status') == 'failed':
raise Error(resp.status_code, data.get('retcode'))
return data.get('data')
raise Error(resp.status_code)
def apply(self, callback, route):
def wrapped(*args, **kwargs):
if not self.tracer or not self.tracer.enabled:
return callback(*args, **kwargs)
resource = "%s %s" % (request.method, request.route.rule)
# Propagate headers such as x-datadog-trace-id.
if self.distributed_tracing:
propagator = HTTPPropagator()
context = propagator.extract(request.headers)
if context.trace_id:
self.tracer.context_provider.activate(context)
with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s:
code = 0
try:
return callback(*args, **kwargs)
except Exception:
# bottle doesn't always translate unhandled exceptions, so
# we mark it here.
code = 500
raise
finally:
s.set_tag(http.STATUS_CODE, code or response.status_code)
s.set_tag(http.URL, request.path)
s.set_tag(http.METHOD, request.method)
return wrapped
def get_api_key():
"""Return the api key or None."""
header = request.headers.get('Authorization')
if not header: return
try:
method, data = header.split(None, 1)
if method.lower() != 'api-key': return
return touni(base64.b64decode(tob(data[4:])))
except (ValueError, TypeError):
abort(401, HEADER_ERROR)
def get_endpoint_url():
"""Return the url that this server is reachable at."""
return "http://" + request.headers["Host"] + BASE
def test_jsonapi_header():
"""Make sure that the content type is set accordingly.
http://jsonapi.org/format/#content-negotiation-clients
"""
content_type = request.content_type
content_type_expected = "application/vnd.api+json"
if content_type != content_type_expected and content_type.startswith(content_type_expected):
abort(415, "The Content-Type header must be \"{}\", not \"{}\".".format(
content_type_expected, content_type))
accepts = request.headers.get("Accept", "*/*").split(",")
expected_accept = ["*/*", "application/*", "application/vnd.api+json"]
if not any([accept in expected_accept for accept in accepts]):
abort(406, "The Accept header must one of \"{}\", not \"{}\".".format(
expected_accept, ",".join(accepts)))
def test_header_access(self):
""" Environ: Request objects decode headers """
e = {}
wsgiref.util.setup_testing_defaults(e)
e['HTTP_SOME_HEADER'] = 'some value'
request = BaseRequest(e)
request['HTTP_SOME_OTHER_HEADER'] = 'some other value'
self.assertTrue('Some-Header' in request.headers)
self.assertTrue(request.headers['Some-Header'] == 'some value')
self.assertTrue(request.headers['Some-Other-Header'] == 'some other value')
def test_header_access_special(self):
e = {}
wsgiref.util.setup_testing_defaults(e)
request = BaseRequest(e)
request['CONTENT_TYPE'] = 'test'
request['CONTENT_LENGTH'] = '123'
self.assertEqual(request.headers['Content-Type'], 'test')
self.assertEqual(request.headers['Content-Length'], '123')
def test_content_type(self):
rs = BaseResponse()
rs.content_type = 'test/some'
self.assertEquals('test/some', rs.headers.get('Content-Type'))
def test_set_header(self):
response = BaseResponse()
response['x-test'] = 'foo'
headers = [value for name, value in response.headerlist
if name.title() == 'X-Test']
self.assertEqual(['foo'], headers)
self.assertEqual('foo', response['x-test'])
response['X-Test'] = 'bar'
headers = [value for name, value in response.headerlist
if name.title() == 'X-Test']
self.assertEqual(['bar'], headers)
self.assertEqual('bar', response['x-test'])
def test_append_header(self):
response = BaseResponse()
response.set_header('x-test', 'foo')
headers = [value for name, value in response.headerlist
if name.title() == 'X-Test']
self.assertEqual(['foo'], headers)
self.assertEqual('foo', response['x-test'])
response.add_header('X-Test', 'bar')
headers = [value for name, value in response.headerlist
if name.title() == 'X-Test']
self.assertEqual(['foo', 'bar'], headers)
self.assertEqual('bar', response['x-test'])
def test_host_http_proxy(self):
# Trust proxy headers over original header.
self.assertRedirect('./test.html', 'http://example.com/test.html',
HTTP_X_FORWARDED_HOST='example.com',
HTTP_HOST='127.0.0.1')
def setUp(self):
self.env = {}
self.headers = bottle.WSGIHeaderDict(self.env)
def test_native(self):
self.env['HTTP_TEST_HEADER'] = 'foobar'
self.assertEqual(self.headers['Test-header'], 'foobar')
def test_bytes(self):
self.env['HTTP_TEST_HEADER'] = tob('foobar')
self.assertEqual(self.headers['Test-Header'], 'foobar')
def test_dict(self):
for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
self.assertTrue(key not in self.headers)
self.assertEqual(self.headers.get(key), None)
self.assertEqual(self.headers.get(key, 5), 5)
self.assertRaises(KeyError, lambda x: self.headers[x], key)
self.env['HTTP_FOO_BAR'] = 'test'
for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
self.assertTrue(key in self.headers)
self.assertEqual(self.headers.get(key), 'test')
self.assertEqual(self.headers.get(key, 5), 'test')
def index():
auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name)
endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string
proxy_req_header = {}
for header in request.headers:
if header.lower() in PROXY_REQ_HEADERS_WHITELIST:
proxy_req_header[header] = request.headers[header]
if request.method == "HEAD":
requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header)
elif request.method == "GET":
requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header)
elif request.method == "POST":
proxy_req_body = request.body.getvalue()
requests_response = requests.post(
endpoint,
auth=auth,
data=proxy_req_body,
headers=proxy_req_header)
elif request.method == "PUT":
proxy_req_body = request.body.getvalue()
requests_response = requests.put(
endpoint,
auth=auth,
data=proxy_req_body,
headers=proxy_req_header)
else:
logging.error("Method not allowed")
response.body = requests_response.content
for header in requests_response.headers:
if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST:
response.add_header(header, requests_response.headers[header])
return response
def _handle(self):
if self._secret:
# check signature
if 'X-Signature' not in request.headers:
abort(401)
sec = self._secret
if isinstance(sec, str):
sec = sec.encode('utf-8')
sig = hmac.new(sec, request.body.read(), 'sha1').hexdigest()
if request.headers['X-Signature'] != 'sha1=' + sig:
abort(403)
post_type = request.json.get('post_type')
if post_type not in ('message', 'event', 'request'):
abort(400)
handler_key = None
for pk_pair in (('message', 'message_type'),
('event', 'event'),
('request', 'request_type')):
if post_type == pk_pair[0]:
handler_key = request.json.get(pk_pair[1])
if not handler_key:
abort(400)
else:
break
if not handler_key:
abort(400)
for group in self._groups:
handler = self._handlers[group][post_type].get(handler_key)
if not handler:
handler = self._handlers[group][post_type].get('*') # try wildcard
if handler:
assert callable(handler)
result = handler(request.json)
if 'pass' in result:
continue
else:
return result
return ''