def run_active_monitor_view(request: web.Request) -> web.Response:
"""GET view to run an active monitor immediately."""
monitor_id = int(request.match_info['id'])
am_manager = request.app['active_monitor_manager']
monitor = am_manager.monitors[monitor_id]
monitor.schedule_immediately()
return web.HTTPFound('/active_monitor/%s/?notification_msg=Monitor job scheduled' % monitor_id)
python类HTTPFound()的实例源码
def send_active_monitor_test_notification(request: web.Request) -> web.Response:
"""GET view to send a test notification for an active monitor."""
monitor_id = int(request.match_info['id'])
am_manager = request.app['active_monitor_manager']
monitor = am_manager.monitors[monitor_id]
monitor.schedule_immediately()
await monitor.notify_state_change('UNKNOWN', abs(monitor.state_ts - (time.time() - monitor.state_ts)))
return web.HTTPFound('/active_monitor/%s/?notification_msg=Notification sent' % monitor_id)
def signout(request):
referer = request.headers.get('Referer')
r = web.HTTPFound(referer or '/')
r.set_cookie(COOKIE_NAME,'-delete-',max_age=0,httponly=True)
logging.info('user signed out')
return r
# ??
def auth_factory(app,handler):
async def auth(request):
request.__user__ = None
cookie_str = request.cookies.get(COOKIE_NAME)
if cookie_str:
user = await cookie2user(cookie_str)
if user:
logging.info('set current user :%s' % user.email)
request.__user__ = user
if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin):
return web.HTTPFound('/signin')
return await handler(request)
return auth
# ??????
def response_factory(app,handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r,web.StreamResponse):
return r
if isinstance(r,bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r,str):
if r.startswith('redirect'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r,dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r,ensure_ascii=False,default=lambda o:o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r,int) and r>=100 and r<600:
return web.Response(r)
if isinstance(r,tuple) and len(r) == 2:
t,m=r
if isinstance(t,int) and t>=100 and t<600:
return web.Response(t,str(m))
# default
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
#?????
def signout(request):
referer = request.headers.get('Referer')
r = web.HTTPFound(referer or '/')
r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True)
logging.info('User signed out.')
return r
def auth_factory(app, handler):
@asyncio.coroutine
def auth(request):
logging.info('check user: {} {}'.format(request.method, request.path))
request.__user__ = None
cookie_str = request.cookies.get(COOKIE_NAME)
if cookie_str:
user = yield from cookie2user(cookie_str)
if user:
logging.info('set current user: {}'.format(user.email))
request.__user__ = user
if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin):
return web.HTTPFound('/signin')
return (yield from handler(request))
return auth
def response_factory(app, handler):
@asyncio.coroutine
def response(request):
logging.info('Response handler...')
r = yield from handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
res = web.Response(body = r)
res.content_type = 'application/octet-stream'
return res
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
res = web.Response(body = r.encode('utf-8'))
res.content_type = 'text/html; charset=utf-8'
return res
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
res = web.Response(body = json.dumps(r, ensure_ascii = False, default = lambda o: o.__dict__).encode('utf-8'))
res.content_type = 'application/json;charset=utf-8'
return res
else:
r['__user__'] = request.__user__
res = web.Response(body = app['__templating__'].get_template(template).render(**r).encode('utf-8'))
res.content_type = 'text/html;charset=utf-8'
return res
if isinstance(r, int) and r >= 100 and r < 600:
return web.Response(r)
if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and t >= 100 and t < 600:
return web.Response(t, str(m))
#default:
res = web.Request(body = str(r).encode('utf-8'))
res.content_type = 'text/plain;charset=utf-8'
return res
return response
def logout_handler(request):
"""Handles logging out
Gets the logout route, deletes the session data, then redirects the user
to the logout route.
"""
session = await get_session(request)
on_logout = request.app[APP_KEY]['ON_LOGOUT']
# Delete the session key and redirect to the logout url
del session[SESSION_KEY]
return web.HTTPFound(on_logout)
def contribute_redirect(request):
return web.HTTPFound('/v1/contribute.json')
def redirect(request):
return web.HTTPFound('/v1/')
def handle_404(request, response):
if 'json' not in response.headers['Content-Type']:
if request.path.endswith('/'):
return web.HTTPFound(request.path.rstrip('/'))
return web.json_response({
"status": 404,
"message": "Page '{}' not found".format(request.path)
}, status=404)
return response
def redirect(urlname, *args, **kwargs):
return HTTPFound(url_for(urlname, *args, **kwargs))
def index_with_culture(request):
"""
Redirects to the index page with culture information.
"""
culture = request.match_info.get("culture")
if not culture:
culture = get_best_culture(request, area="public")
return web.HTTPFound("/" + culture + "/")
def auth_factory(app, handler):
async def auth(request):
logging.info('check user: %s %s' % (request.method, request.path))
request.__user__ = None
cookie_str = request.cookies.get(COOKIE_NAME)
if cookie_str:
user = await User.find_by_cookie(cookie_str)
if user:
logging.info('set current user: %s' % user.email)
request.__user__ = user
if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin):
return web.HTTPFound('/signin')
return (await handler(request))
return auth
def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
elif isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
elif isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
elif isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(
body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
# ???jinja2????????????
r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
elif isinstance(r, int) and 100 <= r < 600:
return web.Response(status=r)
elif isinstance(r, tuple) and len(r) == 2:
status, message = r
if isinstance(status, int) and 100 <= status < 600:
return web.Response(status=status, text=str(message))
else:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def signout(request):
referer = request.headers.get('Referer')
r = web.HTTPFound(referer or '/')
# ???cookie???????
r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True)
logging.info('user signed out')
return r
# ???????????????
def vote(self, request):
question_id = int(request.match_info['question_id'])
data = await request.post()
try:
choice_id = int(data['choice'])
except (KeyError, TypeError, ValueError) as e:
raise web.HTTPBadRequest(
text='You have not specified choice value') from e
try:
await db.vote(self.postgres, question_id, choice_id)
except db.RecordNotFound as e:
raise web.HTTPNotFound(text=str(e))
router = request.app.router
url = router['results'].url(parts={'question_id': question_id})
return web.HTTPFound(location=url)
def timeline(self, request):
session = await get_session(request)
user_id = session.get('user_id')
if user_id is None:
router = request.app.router
location = router['public_timeline'].url()
raise web.HTTPFound(location=location)
user = await self.mongo.user.find_one({'_id': ObjectId(user_id)})
query = {'who_id': ObjectId(user_id)}
filter = {'whom_id': 1}
followed = await self.mongo.follower.find_one(query, filter)
if followed is None:
followed = {'whom_id': []}
query = {'$or': [{'author_id': ObjectId(user_id)},
{'author_id': {'$in': followed['whom_id']}}]}
messages = await (self.mongo.message
.find(query)
.sort('pub_date', -1)
.to_list(30))
endpoint = request.match_info.route.name
return {"messages": messages,
"user": user,
"endpoint": endpoint}
def redirect(request, name, **kw):
router = request.app.router
location = router[name].url(**kw)
return web.HTTPFound(location=location)