def adapter_do_OPTIONS(self, request):
origin = request.headers["Origin"]
allowed_origins = self._bot.config.get_option("api_origins")
if allowed_origins is None:
raise web.HTTPForbidden()
if "*" == allowed_origins or "*" in allowed_origins:
return web.Response(headers={
"Access-Control-Allow-Origin": origin,
"Access-Control-Allow-Headers": "content-type",
})
if not origin in allowed_origins:
raise web.HTTPForbidden()
return web.Response(headers={
"Access-Control-Allow-Origin": origin,
"Access-Control-Allow-Headers": "content-type",
"Vary": "Origin",
})
python类Response()的实例源码
def adapter_do_GET(self, request):
payload = { "sendto": request.match_info["id"],
"key": request.match_info["api_key"],
"content": unquote(request.match_info["message"]) }
results = await self.process_request('', # IGNORED
'', # IGNORED
payload)
if results:
content_type="text/html"
results = results.encode("ascii", "xmlcharrefreplace")
else:
content_type="text/plain"
results = "OK".encode('utf-8')
return web.Response(body=results, content_type=content_type)
def test_middleware_doesnt_reissue_on_bad_response(self):
secret = b'01234567890abcdef'
auth_ = auth.CookieTktAuthentication(secret, 15, 0, cookie_name='auth')
middlewares = [
auth_middleware(auth_)]
valid_until = time.time() + 15
session_data = TicketFactory(secret).new('some_user',
valid_until=valid_until)
request = await make_request('GET', '/', middlewares, \
[(auth_.cookie_name, session_data)])
user_id = await auth.get_auth(request)
self.assertEqual(user_id, 'some_user')
response = await make_response(request, middlewares, web.Response(status=400))
self.assertFalse(auth_.cookie_name in response.cookies)
def app_hello(self, request):
return web.Response(text='Djaio!')
def get(self):
result = web.Response(
body=str.encode('You shall not pass!'),
content_type='text/plain'
)
return result
def response_ok():
return web.Response(**{'status': 200})
def response_not_found():
return web.Response(**{'status': 404})
def logger_factory(app, handler):
async def logger(request):
logging.info('Response: %s %s' % (request.method, request.path))
return await handler(request)
return logger
# ??cookie???????????????request.__user__
def data_factory(app, handler):
async def parse_data(request):
logging.info('data_factory...')
if request.method in ('POST', 'PUT'):
if not request.content_type:
return web.HTTPBadRequest(text='Missing Content-Type.')
content_type = request.content_type.lower()
if content_type.startswith('application/json'):
request.__data__ = await request.json()
if not isinstance(request.__data__, dict):
return web.HTTPBadRequest(text='JSON body must be object.')
logging.info('request json: %s' % request.__data__)
elif content_type.startswith(('application/x-www-form-urlencoded', 'multipart/form-data')):
params = await request.post()
request.__data__ = dict(**params)
logging.info('request form: %s' % request.__data__)
else:
return web.HTTPBadRequest(text='Unsupported Content-Type: %s' % content_type)
elif request.method == 'GET':
qs = request.query_string
request.__data__ = {k: v[0] for k, v in parse.parse_qs(qs, True).items()}
logging.info('request query: %s' % request.__data__)
else:
request.__data__ = dict()
return await handler(request)
return parse_data
# ??????????????????Response??
def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
# ???jinja2????????????
r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and 100 <= r < 600:
return web.Response(status=r)
if isinstance(r, tuple) and len(r) == 2:
status, message = r
if isinstance(status, int) and 100 <= status < 600:
return web.Response(status=status, text=str(message))
# default
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def make_webserver(self):
async def page(request):
body = self.settings['content']
return web.Response(text=body, content_type='text/html')
self.app.router.add_get('/', page)
self.handler = self.app.make_handler()
port = self.settings['server_port']
self.server = await self.bot.loop.create_server(self.handler, '0.0.0.0', port)
print('Serving webserver on {}:{}'.format(self.ip, port))
def webhook_handle(self, request):
"""
aiohttp.web handle for processing web hooks
:Example:
>>> from aiohttp import web
>>> app = web.Application()
>>> app.router.add_route('/webhook')
"""
update = await request.json()
self._process_update(update)
return web.Response()
def sparql_endpoint(request):
result = {
"post": dict((await request.post()).items()),
"path": request.path,
}
if "failure" in result['post'].get('query', ""):
raise web.HTTPBadRequest()
if "failure" in result['post'].get('update', ""):
raise web.HTTPBadRequest()
return web.Response(text=json.dumps(result),
content_type="application/json")
def crud_endpoint(request):
request.app['last_request'] = request
if request.method == "PATCH":
return web.Response(text="{}", content_type="application/json")
else:
raise web.HTTPNoContent()
def handle(request):
text = "Hello, can you hear me?"
return web.Response(body=text.encode('utf-8'))
def header_response(request):
return Response("foo", headers={
"location": "boo"
})
def registry_dump_handle(request):
'''
only read
:param request:
:return:
'''
registry = registry_dump_handle.registry
response_dict = {}
repo = registry._repository
response_dict['registered_services'] = repo._registered_services
response_dict['uptimes'] = repo._uptimes
response_dict['service_dependencies'] = repo._service_dependencies
return web.Response(status=400, content_type='application/json', body=json.dumps(response_dict).encode())
def _enable_http_middleware(func): # pre and post http, processing
@wraps(func)
async def f(self, *args, **kwargs):
if hasattr(self, 'middlewares'):
for i in self.middlewares:
if hasattr(i, 'pre_request'):
pre_request = getattr(i, 'pre_request')
if callable(pre_request):
try:
res = await pre_request(self, *args, **kwargs) # passing service as first argument
if res:
return res
except Exception as e:
return Response(status=400, content_type='application/json',
body=json.dumps(
{'error': str(e), 'sector': getattr(i, 'middleware_info')}).encode())
_func = coroutine(func) # func is a generator object
result = await _func(self, *args, **kwargs)
if hasattr(self, 'middlewares'):
for i in self.middlewares:
if hasattr(i, 'post_request'):
post_request = getattr(i, 'post_request')
if callable(post_request):
try:
res = await post_request(self, result, *args, **kwargs)
if res:
return res
except Exception as e:
return Response(status=400, content_type='application/json',
body=json.dumps(
{'error': str(e), 'sector': getattr(i, 'middleware_info')}).encode())
return result
return f
def pong(self, _):
return Response()
def stats(self, _):
res_d = Aggregator.dump_stats()
return Response(status=200, content_type='application/json', body=json.dumps(res_d).encode())