def create_handler(transmute_func, context):
@wraps(transmute_func.raw_func)
async def handler(request):
exc, result = None, None
try:
args, kwargs = await extract_params(request, context,
transmute_func)
result = await transmute_func.raw_func(*args, **kwargs)
except HTTPException as hpe:
code = hpe.status_code or 400
exc = APIException(code=code, message=str(hpe))
except Exception as e:
exc = e
response = transmute_func.process_result(
context, result, exc, request.content_type
)
return web.Response(
body=response["body"], status=response["code"],
content_type=response["content-type"],
headers=response["headers"]
)
handler.transmute_func = transmute_func
return handler
python类Response()的实例源码
def add_swagger_api_route(app, target_route, swagger_json_route):
"""
mount a swagger statics page.
app: the aiohttp app object
target_route: the path to mount the statics page.
swagger_json_route: the path where the swagger json definitions is
expected to be.
"""
static_root = get_swagger_static_root()
swagger_body = generate_swagger_html(
STATIC_ROOT, swagger_json_route
).encode("utf-8")
async def swagger_ui(request):
return web.Response(body=swagger_body, content_type="text/html")
app.router.add_route("GET", target_route, swagger_ui)
app.router.add_static(STATIC_ROOT, static_root)
def create_swagger_json_handler(app, **kwargs):
"""
Create a handler that returns the swagger definition
for an application.
This method assumes the application is using the
TransmuteUrlDispatcher as the router.
"""
spec = get_swagger_spec(app).swagger_definition(**kwargs)
encoded_spec = json.dumps(spec).encode("UTF-8")
async def swagger(request):
return web.Response(
# we allow CORS, so this can be requested at swagger.io
headers={
"Access-Control-Allow-Origin": "*"
},
body=encoded_spec,
content_type="application/json",
)
return swagger
def _http_add_org_webhook(self, request):
await request.post()
data = {
"name": "web",
"active": True,
"events": ["*"],
"config": {
"url": self.root.config.core["url"] + self.url + "webhook",
"content_type": "json"
},
}
client = self._get_client(request)
resp = await client.post("/orgs/:org/hooks", request.POST["org"], **data)
org_data = await client.get("/orgs/:org", request.POST["org"])
self.orgs[str(org_data["id"]).encode("ascii")] = client.token
return web.Response(text=str(resp))
def http_handler(self, request):
if request.path.endswith("api.sock"):
return await self.ws_handler(request)
if request.path.endswith("/monitor/"):
data = pkgutil.get_data("rci.services.monitor",
"monitor.html").decode("utf8")
return web.Response(text=data, content_type="text/html")
if request.path.endswith("/login/github"):
if request.method == "POST":
url = self.oauth.generate_request_url(("read:org", ))
return web.HTTPFound(url)
if request.path.endswith("/oauth2/github"):
return (await self._oauth2_handler(request))
if request.path.endswith("logout"):
if request.method == "POST":
sid = request.cookies.get(self.config["cookie_name"])
del(self.sessions[sid])
return web.HTTPFound("/monitor/")
return web.HTTPNotFound()
def get(self) -> web.Response:
dbcon = self.request.app['dbcon']
if 'id' in self.request.rel_url.query:
contact_id = require_int(get_request_param(self.request, 'id'))
c = await contact.get_contact(dbcon, contact_id)
contact_list = [] # type: Iterable[object_models.Contact]
if c:
contact_list = [c]
metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact', contact_id)
elif 'meta_key' in self.request.rel_url.query:
meta_key = require_str(get_request_param(self.request, 'meta_key'))
meta_value = require_str(get_request_param(self.request, 'meta_value'))
contact_list = await contact.get_contacts_for_metadata(dbcon, meta_key, meta_value)
metadata_list = await metadata.get_metadata_for_object_metadata(
dbcon, meta_key, meta_value, 'contact', 'contacts')
else:
contact_list = await contact.get_all_contacts(dbcon)
metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'contact')
return web.json_response(apply_metadata_to_model_list(contact_list, metadata_list))
def get(self) -> web.Response:
dbcon = self.request.app['dbcon']
if 'id' in self.request.rel_url.query:
contact_group_id = require_int(get_request_param(self.request, 'id'))
contact_group_item = await contact.get_contact_group(dbcon, contact_group_id)
contact_group_list = [] # type: Iterable[object_models.ContactGroup]
if contact_group_item:
contact_group_list = [contact_group_item]
metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact_group', contact_group_id)
elif 'meta_key' in self.request.rel_url.query:
meta_key = require_str(get_request_param(self.request, 'meta_key'))
meta_value = require_str(get_request_param(self.request, 'meta_value'))
contact_group_list = await contact.get_contact_groups_for_metadata(dbcon, meta_key, meta_value)
metadata_list = await metadata.get_metadata_for_object_metadata(
dbcon, meta_key, meta_value, 'contact_group', 'contact_groups')
else:
contact_group_list = await contact.get_all_contact_groups(dbcon)
metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group')
return web.json_response(apply_metadata_to_model_list(contact_group_list, metadata_list))
def get(self) -> web.Response:
dbcon = self.request.app['dbcon']
if 'id' in self.request.rel_url.query:
monitor_group_id = require_int(get_request_param(self.request, 'id'))
monitor_group_item = await monitor_group.get_monitor_group(dbcon, monitor_group_id)
monitor_group_list = [] # type: Iterable[object_models.MonitorGroup]
if monitor_group_item:
monitor_group_list = [monitor_group_item]
metadata_list = await metadata.get_metadata_for_object(dbcon, 'monitor_group', monitor_group_id)
elif 'meta_key' in self.request.rel_url.query:
meta_key = require_str(get_request_param(self.request, 'meta_key'))
meta_value = require_str(get_request_param(self.request, 'meta_value'))
monitor_group_list = await monitor_group.get_monitor_groups_for_metadata(dbcon, meta_key, meta_value)
metadata_list = await metadata.get_metadata_for_object_metadata(
dbcon, meta_key, meta_value, 'monitor_group', 'monitor_groups')
else:
monitor_group_list = await monitor_group.get_all_monitor_groups(dbcon)
metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group')
return web.json_response(apply_metadata_to_model_list(monitor_group_list, metadata_list))
def authenticate(*,email,passwd):
if not email:
raise APIValueError('email','Invalid email')
if not passwd:
raise APIValueError('passwd','Invalid passwd')
users = await User.findAll('email=?',[email])
if len(users) == 0:
raise APIValueError('email','Email not exist.')
user = users[0]
# check passwd
sha1 = hashlib.sha1()
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd','passwd error')
# authenticate ok set cookie
r = web.Response()
r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user,ensure_ascii=False).encode('utf-8')
return r
# ??
def api_register_user(*,email,name,passwd):
if not name or not name.strip():
raise APIValueError('name')
if not email or not _RE_EMAIL.match(email):
raise APIValueError('email')
if not passwd or not _RE_SHA1.match(passwd):
raise APIValueError('passwd')
users = await User.findAll('email=?',[email])
if len(users) > 0:
raise APIError('register:failed','email','Email is already in use')
uid = next_id()
sha1_passwd = '%s:%s' % (uid,passwd)
encrypt_passwd = hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest()
image = 'https://1.gravatar.com/avatar/%s?s=200&r=pg&d=mm'
user = User(id=uid,name=name.strip(),email=email,passwd=encrypt_passwd,image=image % hashlib.md5(email.encode('utf-8')).hexdigest())
await user.save()
# make session cookie
r = web.Response()
r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user,ensure_ascii=False).encode('utf-8')
return r
def handle(self, request):
filename = request.match_info['filename']
try:
filepath = self._directory.joinpath(filename).resolve()
except (ValueError, FileNotFoundError, OSError):
pass
else:
if filepath.is_dir():
request.match_info['filename'] = str(filepath.joinpath('index.html').relative_to(self._directory))
status, length = 'unknown', ''
try:
response = await super().handle(request)
except HTTPNotModified:
status, length = 304, 0
raise
except HTTPNotFound:
_404_msg = '404: Not Found\n\n' + _get_asset_content(self._asset_path)
response = web.Response(body=_404_msg.encode('utf8'), status=404)
status, length = response.status, response.content_length
else:
status, length = response.status, response.content_length
finally:
l = logger.info if status in {200, 304} else logger.warning
l(' > %s %s %s %s', request.method, request.path, status, _fmt_size(length))
return response
def async_handle_subscribe(request, subscribed_clients, state_variables):
callback_url = request.headers.get('CALLBACK')[1:-1]
sid = 'uuid:' + str(uuid.uuid4())
subscribed_clients[sid] = callback_url
headers = {
'SID': sid
}
@asyncio.coroutine
def async_push_later(state_variable):
yield from asyncio.sleep(0.5)
yield from state_variable.async_notify_listeners()
for state_variable in state_variables.values():
LOGGER.debug('Pushing state_variable on SUBSCRIBE: %s', state_variable.name)
asyncio.get_event_loop().create_task(async_push_later(state_variable))
return web.Response(status=200, headers=headers)
#endregion
#region Unsubscribe
def api_register_user(*, email, name, password):
if not name or not name.strip():
raise APIValueError('name')
if not email or not _RE_EMAIL.match(email):
raise APIValueError('email')
if not password or not _RE_SHA1.match(password):
raise APIValueError('password')
users = yield from User.find_all('email=?', [email])
if len(users) > 0:
raise APIError('Register failed', 'email', 'Email is already in use.')
uid = next_id()
sha1_password = '{}:{}'.format(uid, password)
logging.info('register password:{}, sha1_password:{}'.format(password, sha1_password))
user = User(id=uid, name= name.strip(), email= email, password = hashlib.sha1(sha1_password.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/{}?d=mm&s=120'.format(hashlib.md5(email.encode('utf-8')).hexdigest()))
yield from user.save()
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.password = '*' * 8
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
def authenticate(*, email, password):
if not email:
raise APIValueError('email', 'Invalid Email')
if not password:
raise APIValueError('password', 'Invalid Password')
users = yield from User.find_all('email=?', [email])
if len(users) == 0:
raise APIValueError('email', 'Email not exist')
user = users[0]
#check password
sha1_password = '{}:{}'.format(user.id, password)
logging.info('login password:{}, sha1_password:{}'.format(password, sha1_password))
if user.password != hashlib.sha1(sha1_password.encode('utf-8')).hexdigest():
raise APIValueError('password', 'Invalid Password.')
# authenticate ok, set cookie
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.password = '*' * 8
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
def datetime_filter(t):
date_time = datetime.fromtimestamp(t)
str_date = date_time.strftime("%Y-%m-%d %X")
delta = int(time.time() - t)
if delta < 60:
return u'<span title="{}">1???</span>'.format(str_date)
if delta < 3600:
return u'<span title="{}">{}???</span>'.format(str_date, delta // 60)
if delta < 86400:
return u'<span title="{}">{}???</span>'.format(str_date, delta // 3600)
if delta < 604800:
return u'<span title="{}">{}??</span>'.format(str_date, delta // 86400)
#dt = datetime.fromtimestamp(t)
return u'<span title="{}">{}</span>'.format(str_date, date_time.strftime("%Y?%m?%d?"))
#def index(request):
#return web.Response(body=b'<h1>Awesome Python3 Web</h1>', content_type='text/html')
def aiohttp_server(loop, addr):
async def handle(request):
payload_size = int(request.match_info.get('size', 1024))
resp = _RESP_CACHE.get(payload_size)
if resp is None:
resp = b'X' * payload_size
_RESP_CACHE[payload_size] = resp
return web.Response(body=resp)
app = web.Application(loop=loop)
app.router.add_route('GET', '/{size}', handle)
app.router.add_route('GET', '/', handle)
handler = app.make_handler()
server = loop.create_server(handler, *addr)
return server
def __call__(self, value):
# Safe html transformation
if _is_pserver_response(value):
body = value.response
if not isinstance(body, bytes):
if not isinstance(body, str):
body = json.dumps(value.response)
body = body.encode('utf8')
value = aioResponse(
body=body, status=value.status,
headers=value.headers)
if 'content-type' not in value.headers:
value.headers.update({
'content-type': 'text/html'
})
return value
def guess_response(self, value):
resp = value.response
if isinstance(resp, dict):
resp = aioResponse(body=bytes(json.dumps(resp, cls=PServerJSONEncoder), 'utf-8'))
resp.headers['Content-Type'] = 'application/json'
elif isinstance(resp, list):
resp = aioResponse(body=bytes(json.dumps(resp, cls=PServerJSONEncoder), 'utf-8'))
resp.headers['Content-Type'] = 'application/json'
elif isinstance(resp, str):
resp = aioResponse(body=bytes(resp, 'utf-8'))
resp.headers['Content-Type'] = 'text/html'
elif resp is None:
# missing result...
resp = aioResponse(body=b'{}')
resp.headers['Content-Type'] = 'application/json'
resp.headers.update(value.headers)
if not resp.prepared:
resp.set_status(value.status)
return resp
def home(request): # <1>
query = request.GET.get('query', '').strip() # <2>
print('Query: {!r}'.format(query)) # <3>
if query: # <4>
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Enter words describing characters.'
html = template.format(query=query, result=res, # <5>
message=msg)
print('Sending {} results'.format(len(descriptions))) # <6>
return web.Response(content_type=CONTENT_TYPE, text=html) # <7>
# END HTTP_CHARFINDER_HOME
# BEGIN HTTP_CHARFINDER_SETUP
def get_chars(request):
peername = request.transport.get_extra_info('peername')
print('Request from: {}, GET data: {!r}'.format(peername, dict(request.GET)))
query = request.GET.get('query', '')
if query:
try:
start = int(request.GET.get('start', 0))
stop = int(request.GET.get('stop', sys.maxsize))
except ValueError:
raise web.HTTPBadRequest()
stop = min(stop, start+RESULTS_PER_REQUEST)
num_results, chars = index.find_chars(query, start, stop)
else:
raise web.HTTPBadRequest()
text = ''.join(char if n % 64 else char+'\n'
for n, char in enumerate(chars, 1))
response_data = {'total': num_results, 'start': start, 'stop': stop}
print('Response to query: {query!r}, start: {start}, stop: {stop}'.format(
query=query, **response_data))
response_data['chars'] = text
json_obj = json.dumps(response_data)
print('Sending {} characters'.format(len(text)))
headers = {'Access-Control-Allow-Origin': '*'}
return web.Response(content_type=TEXT_TYPE, headers=headers, text=json_obj)
def handle(request):
query = request.GET.get('query', '')
print('Query: {!r}'.format(query))
if query:
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Type words describing characters.'
text = PAGE_TPL.format(query=query, result=res,
message=msg, links=LINKS_HTML)
print('Sending {} results'.format(len(descriptions)))
return web.Response(content_type=CONTENT_TYPE, text=text)
def home(request): # <1>
query = request.GET.get('query', '').strip() # <2>
print('Query: {!r}'.format(query)) # <3>
if query: # <4>
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Enter words describing characters.'
html = template.format(query=query, result=res, # <5>
message=msg)
print('Sending {} results'.format(len(descriptions))) # <6>
return web.Response(content_type=CONTENT_TYPE, text=html) # <7>
# END HTTP_CHARFINDER_HOME
# BEGIN HTTP_CHARFINDER_SETUP
def homepage(request):
conn = request.app['conn']
t = HTML_HDR
t += "<h2>%s</h2>" % conn.server_info
# NOTE: only a demo program would do all these remote server
# queries just to display the hompage...
donate = await conn.RPC('server.donation_address')
motd = await conn.RPC('server.banner')
t += '<pre style="font-size: 50%%">\n%s\n</pre><hr/>' % motd
t += '<p>Donations: %s</p>' % linkage(donate)
t += '</p><p>Top block: %s</p>' % linkage(top_blk)
t += '''
<form method=POST action="/">
<input name='q' style="width: 50%" placeholder="Txn hash / address / etc"></input>
'''
return Response(content_type='text/html', text=t)
def search(request):
query = (await request.post())['q'].strip()
if not (1 <= len(query) <= 200):
raise HTTPFound('/')
if len(query) <= 7:
raise HTTPFound('/blk/'+query.lower())
elif len(query) == 64:
# assume it's a hash of block or txn
raise HTTPFound('/txn/'+query.lower())
elif query[0] in '13mn':
# assume it'a payment address
raise HTTPFound('/addr/'+query)
else:
return Response(text="Can't search for that")
def address_page(request):
# address summary by bitcoin payment addr
addr = request.match_info['addr']
conn = request.app['conn']
t = HTML_HDR
t += '<h1><code>%s</code></h1>' % addr
for method in ['blockchain.address.get_balance',
'blockchain.address.get_status',
'blockchain.address.get_mempool',
'blockchain.address.get_proof',
'blockchain.address.listunspent']:
# get a balance, etc.
t += await call_and_format(conn, method, addr)
return Response(content_type='text/html', text=t)
def register_in_memory_block_store_api(app, prefix='/blockstore'):
# Really, really simple stuff ;-)
blocks = {}
async def api_block_get(request):
id = request.match_info['id']
try:
return web.Response(body=blocks[id], content_type='application/octet-stream')
except KeyError:
raise web.HTTPNotFound()
async def api_block_post(request):
id = request.match_info['id']
if id in blocks:
raise web.HTTPConflict()
blocks[id] = await request.read()
return web.Response()
app.router.add_get(prefix + '/{id}', api_block_get)
app.router.add_post(prefix + '/{id}', api_block_post)
aiohttponeconnect.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def async_good(self, request):
"""TBD."""
data = {
"title": "Top of the best stikers for Telegram",
"active_good": "class=\"active\"",
"active_bad": "",
"top": {}
}
await self.session_handler(request=request)
data["top"] = await self.db.get_top(9, 'ITERATOR_LE')
return web.Response(
text=self.jinja.get_template('good.html').render(
title=data["title"],
active_good=data["active_good"],
active_bad=data["active_bad"],
top=data["top"]
),
content_type='text/html')
# @aiohttp_jinja2.template('bad.html')
aiohttponeconnect.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def action_bad(self, request):
"""TBD."""
data = {
"title": "Top of bad stikers for Telegram",
"active_good": "",
"active_bad": "class=\"active\"",
"top": {}
}
await self.session_handler(request=request)
data['top'] = await self.db.get_top(9, 'ITERATOR_GE')
return web.Response(
text=self.jinja.get_template('good.html').render(
title=data["title"],
active_good=data["active_good"],
active_bad=data["active_bad"],
top=data["top"]
),
content_type='text/html')
def __call__(self, value):
if _is_guillotina_response(value):
body = value.response
if not isinstance(body, bytes):
if not isinstance(body, str):
body = ujson.dumps(value.response)
body = body.encode('utf8')
value = aioResponse(
body=body, status=value.status,
headers=value.headers)
if 'content-type' not in value.headers:
value.headers.update({
'content-type': self.content_type
})
return value
def guess_response(self, value):
resp = value.response
if type(resp) in (dict, list, int, float, bool):
resp = aioResponse(body=bytes(json.dumps(resp, cls=GuillotinaJSONEncoder), 'utf-8'))
resp.headers['Content-Type'] = 'application/json'
elif isinstance(resp, str):
original_resp = resp
resp = aioResponse(body=bytes(resp, 'utf-8'))
if '<html' in original_resp:
resp.headers['Content-Type'] = 'text/html'
else:
resp.headers['Content-Type'] = 'text/plain'
elif resp is None:
# missing result...
resp = aioResponse(body=b'{}')
resp.headers['Content-Type'] = 'application/json'
resp.headers.update(value.headers)
if not resp.prepared:
resp.set_status(value.status)
return resp