def init_audit(log_group):
def audit(f):
@functools.wraps(f)
def handle(account_id, *args, **kw):
envelope = {
'timestamp': int(time.time() * 1000),
'message': json.dumps({
'user': request.environ.get('REMOTE_USER', ''),
'url': request.url,
'path': request.path,
'method': request.method,
'pid': os.getpid(),
'account_id': account_id,
'ip': request.remote_addr})
}
transport.send_group("%s=%s" % (log_group, account_id), [envelope])
return f(account_id, *args, **kw)
return handle
return audit
python类path()的实例源码
def test_pathshift(self):
""" Request.path_shift() """
def test_shift(s, p, c):
request = BaseRequest({'SCRIPT_NAME': s, 'PATH_INFO': p})
request.path_shift(c)
return [request['SCRIPT_NAME'], request.path]
self.assertEqual(['/a/b', '/c/d'], test_shift('/a/b', '/c/d', 0))
self.assertEqual(['/a/b', '/c/d/'], test_shift('/a/b', '/c/d/', 0))
self.assertEqual(['/a/b/c', '/d'], test_shift('/a/b', '/c/d', 1))
self.assertEqual(['/a', '/b/c/d'], test_shift('/a/b', '/c/d', -1))
self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b', '/c/d/', 1))
self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b', '/c/d/', -1))
self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b/', '/c/d/', 1))
self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b/', '/c/d/', -1))
self.assertEqual(['/a/b/c/d', '/'], test_shift('/', '/a/b/c/d', 4))
self.assertEqual(['/', '/a/b/c/d/'], test_shift('/a/b/c/d', '/', -4))
self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', 3)
self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', -3)
def test_url(self):
""" Environ: URL building """
request = BaseRequest({'HTTP_HOST':'example.com'})
self.assertEqual('http://example.com/', request.url)
request = BaseRequest({'SERVER_NAME':'example.com'})
self.assertEqual('http://example.com/', request.url)
request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'})
self.assertEqual('http://example.com:81/', request.url)
request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'})
self.assertEqual('https://example.com/', request.url)
request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path',
'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'})
self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th',
'SCRIPT_NAME':'/s p'})
self.assertEqual('http://example.com/s%20p/pa%20th', request.url)
def apply(self, callback, route):
def wrapped(*args, **kwargs):
if not self.tracer or not self.tracer.enabled:
return callback(*args, **kwargs)
resource = "%s %s" % (request.method, request.route.rule)
# Propagate headers such as x-datadog-trace-id.
if self.distributed_tracing:
propagator = HTTPPropagator()
context = propagator.extract(request.headers)
if context.trace_id:
self.tracer.context_provider.activate(context)
with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s:
code = 0
try:
return callback(*args, **kwargs)
except Exception:
# bottle doesn't always translate unhandled exceptions, so
# we mark it here.
code = 500
raise
finally:
s.set_tag(http.STATUS_CODE, code or response.status_code)
s.set_tag(http.URL, request.path)
s.set_tag(http.METHOD, request.method)
return wrapped
def test_path(self):
""" PATH_INFO normalization. """
# Legal paths
tests = [('', '/'), ('x','/x'), ('x/', '/x/'), ('/x', '/x'), ('/x/', '/x/')]
for raw, norm in tests:
self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path)
# Strange paths
tests = [('///', '/'), ('//x','/x')]
for raw, norm in tests:
self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path)
# No path at all
self.assertEqual('/', BaseRequest({}).path)
def test_script_name(self):
""" SCRIPT_NAME normalization. """
# Legal paths
tests = [('', '/'), ('x','/x/'), ('x/', '/x/'), ('/x', '/x/'), ('/x/', '/x/')]
for raw, norm in tests:
self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name)
# Strange paths
tests = [('///', '/'), ('///x///','/x/')]
for raw, norm in tests:
self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name)
# No path at all
self.assertEqual('/', BaseRequest({}).script_name)
def test_set_cookie(self):
r = BaseResponse()
r.set_cookie('name1', 'value', max_age=5)
r.set_cookie('name2', 'value 2', path='/foo')
cookies = [value for name, value in r.headerlist
if name.title() == 'Set-Cookie']
cookies.sort()
self.assertEqual(cookies[0], 'name1=value; Max-Age=5')
self.assertEqual(cookies[1], 'name2="value 2"; Path=/foo')
def index():
auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name)
endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string
proxy_req_header = {}
for header in request.headers:
if header.lower() in PROXY_REQ_HEADERS_WHITELIST:
proxy_req_header[header] = request.headers[header]
if request.method == "HEAD":
requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header)
elif request.method == "GET":
requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header)
elif request.method == "POST":
proxy_req_body = request.body.getvalue()
requests_response = requests.post(
endpoint,
auth=auth,
data=proxy_req_body,
headers=proxy_req_header)
elif request.method == "PUT":
proxy_req_body = request.body.getvalue()
requests_response = requests.put(
endpoint,
auth=auth,
data=proxy_req_body,
headers=proxy_req_header)
else:
logging.error("Method not allowed")
response.body = requests_response.content
for header in requests_response.headers:
if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST:
response.add_header(header, requests_response.headers[header])
return response
def dispatch(url):
"""
This class is the beginning of all entrypoints in the Ray API. Here, each url
will be redirect to the right handler
"""
url = bottle_req.path
log.info('request: %s', bottle_req.url)
if url[-1] == '/':
url = url[:-1]
response_code = 200
try:
processed = process(url, bottle_req, bottle_resp)
try:
from_func, http_status = processed[0], processed[1]
bottle_resp.status = http_status
return from_func
except:
return processed
except exceptions.RayException as e:
log.exception('ray exception: ')
response_code = e.http_code
except:
log.exception('exception:')
raise
bottle_resp.status = response_code
def generic_handler():
"""
The generic handler catches all requests not caught by any other route. It
checks the configuration to see if the URL requested is one registered as a
job's webhook URL handler. If so, it normalizes the request and queues the
job for building.
It returns immediately (aynsc) with a JSON structure containing the job id.
"""
jobdef_manager = request.deps['jobdef_manager']
build_queue = request.deps['build_queue']
config = request.deps['config']
providers = request.deps['providers']
jobdef = jobdef_manager.get_jobdef_from_url(request.path)
if not jobdef:
abort(404, "Not found")
logging.info("Received event for job '{}'".format(jobdef.name))
# Log debug info about the received request
logging.debug("request environ: {}".format(request.environ))
logging.debug("request path: {}".format(request.path))
logging.debug("request method: {}".format(request.method))
for k, v in request.headers.items():
logging.debug("request header: {}={}".format(k, v))
for k, v in request.query.items():
logging.debug("request query: {}={}".format(k, v))
logging.debug("request body: {}".format(request.body.read()))
logging.debug("request auth: {}".format(request.auth))
env = job.make_env(request, jobdef, providers)
job_inst = jobdef.make_job(request.body.read().decode('utf8'), env)
build_queue.put(job_inst)
return {'id': job_inst.id}