def index(request):
form = FeedbackForm(request)
if request.method == 'POST' and form.validate():
note = form.note.data
msg = '{} - {}'.format(datetime.now(), note)
session.setdefault('fb', []).append(msg)
return response.redirect('/')
# NOTE: trusting user input here, never do that in production
feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', []))
# Ah, f string, so, python 3.6, what do you expect from someone brave
# enough to use sanic... :)
content = f"""
<h1>Form with CSRF Validation</h1>
<p>Try <a href="/fail">form</a> that fails CSRF validation</p>
{feedback}
<form action="" method="POST">
{'<br>'.join(form.csrf_token.errors)}
{form.csrf_token}
{'<br>'.join(form.note.errors)}
<br>
{form.note(size=40, placeholder="say something..")}
{form.submit}
</form>
"""
return response.html(content)
python类redirect()的实例源码
def fail(request):
form = FeedbackForm(request)
if request.method == 'POST' and form.validate():
note = form.note.data
msg = '{} - {}'.format(datetime.now(), note)
session.setdefault('fb', []).append(msg)
return response.redirect('/fail')
feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', []))
content = f"""
<h1>Form which fails CSRF Validation</h1>
<p>This is the same as this <a href="/">form</a> except that CSRF
validation always fail because we did not render the hidden csrf token</p>
{feedback}
<form action="" method="POST">
{'<br>'.join(form.csrf_token.errors)}
{'<br>'.join(form.note.errors)}
<br>
{form.note(size=40, placeholder="say something..")}
{form.submit}
</form>
"""
return response.html(content)
def login(request):
message = ''
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
# for demonstration purpose only, you should use more robust method
if username == 'demo' and password == '1234':
# use User proxy in sanic_auth, this should be some ORM model
# object in production, the default implementation of
# auth.login_user expects User.id and User.name available
user = User(id=1, name=username)
auth.login_user(request, user)
return response.redirect('/')
message = 'invalid username or password'
return response.html(LOGIN_FORM.format(message))
def force_https(request):
host = request.headers.get('Host', '')
# get scheme, first by checking the x-forwarded-proto (from nginx/heroku etc)
# then falling back on whether or not there is an sslcontext
scheme = request.headers.get(
'x-forwarded-proto',
"https" if request.transport.get_extra_info('sslcontext') else "http")
if not host.startswith("localhost:") and scheme != "https":
url = urlunparse((
"https",
host,
request.path,
None,
request.query_string,
None))
return redirect(url)
def index(request):
form = UploadForm(request)
if form.validate_on_submit():
image = form.image.data
# NOTE: trusting user submitted file names here, the name should be
# sanitized in production.
uploaded_file = Path(request.app.config.UPLOAD_DIR) / image.name
uploaded_file.write_bytes(image.body)
description = form.description.data or 'no description'
session.setdefault('files', []).append((image.name, description))
return response.redirect('/')
img = '<section><img src="/img/{}"><p>{}</p><hr></section>'
images = ''.join(img.format(i, d) for i, d in session.get('files', []))
content = f"""
<h1>Sanic-WTF file field validators example</h1>
{images}
<form action="" method="POST" enctype="multipart/form-data">
{'<br>'.join(form.csrf_token.errors)}
{form.csrf_token}
{'<br>'.join(form.image.errors)}
{'<br>'.join(form.description.errors)}
<br> {form.image.label}
<br> {form.image}
<br> {form.description.label}
<br> {form.description(size=20, placeholder="description")}
<br> {form.submit}
</form>
"""
return response.html(content)
def logout(request):
auth.logout_user(request)
return response.redirect('/login')
def test_logout(app):
auth = Auth(app)
@app.post('/login')
async def login(request):
name = request.form.get('name')
password = request.form.get('password')
if name == 'demo' and password == '1234':
auth.login_user(request, User(id=1, name=name))
return response.text('okay')
return response.text('failed')
@app.route('/logout')
async def logout(request):
auth.logout_user(request)
return response.redirect('/user')
@app.route('/user')
async def user(request):
user = auth.current_user(request)
if user is not None:
return response.text(user.name)
return response.text('')
payload = {'name': 'demo', 'password': '1234'}
req, resp = app.test_client.post('/login', data=payload)
assert resp.status == 200 and resp.text == 'okay'
req, resp = app.test_client.get('/user')
assert resp.status == 200 and resp.text == 'demo'
req, reps = app.test_client.get('/logout')
assert resp.status == 200 and resp.text == 'demo'
req, resp = app.test_client.get('/user')
assert resp.status == 200 and resp.text == ''
req, reps = app.test_client.get('/logout')
assert resp.status == 200 and resp.text == ''
req, resp = app.test_client.get('/user')
assert resp.status == 200 and resp.text == ''
def test_login_required(app):
# the default is 'auth.login', change to 'login' to avoid using blueprint
app.config.AUTH_LOGIN_ENDPOINT = 'login'
auth = Auth(app)
@app.post('/login')
async def login(request):
name = request.form.get('name')
password = request.form.get('password')
if name == 'demo' and password == '1234':
auth.login_user(request, User(id=1, name=name))
return response.text('okay')
return response.text('failed')
@app.route('/logout')
@auth.login_required
async def logout(request):
auth.logout_user(request)
return response.redirect('/user')
@app.route('/user')
@auth.login_required(user_keyword='user')
async def user(request, user):
return response.text(user.name)
payload = {'name': 'demo', 'password': '1234'}
req, resp = app.test_client.get('/user', allow_redirects=False)
assert resp.status == 302
assert resp.headers['Location'] == app.url_for('login')
payload = {'name': 'demo', 'password': '1234'}
req, resp = app.test_client.post('/login', data=payload)
assert resp.status == 200 and resp.text == 'okay'
req, resp = app.test_client.get('/user')
assert resp.status == 200 and resp.text == 'demo'
def handle_no_auth(self, request):
"""Handle unauthorized user"""
u = self.login_url or request.app.url_for(self.login_endpoint)
return response.redirect(u)
def update_app_categories_handler(request, conf, current_user):
toshi_id = request.form.get('toshi_id')
categories = request.form.get('categories', "")
if toshi_id is not None:
tags = [s.strip() for s in categories.split(",")]
async with conf.db.id.acquire() as con:
categories = await con.fetch("SELECT * FROM categories WHERE tag = ANY($1)", tags)
categories = [row['category_id'] for row in categories]
async with con.transaction():
await con.execute("DELETE FROM app_categories WHERE toshi_id = $1", toshi_id)
await con.executemany(
"INSERT INTO app_categories VALUES ($1, $2) ON CONFLICT DO NOTHING",
[(category_id, toshi_id) for category_id in categories])
if 'Referer' in request.headers:
return redirect(request.headers['Referer'])
return redirect("/{}/user/{}".format(conf.name, toshi_id))
return redirect("/{}/apps".format(conf.name))
def clientgetter(self, f):
"""Register a function as the client getter.
The function accepts one parameter `client_key`, and it returns
a client object with at least these information:
- client_key: A random string
- client_secret: A random string
- redirect_uris: A list of redirect uris
- default_realms: Default scopes of the client
The client may contain more information, which is suggested:
- default_redirect_uri: One of the redirect uris
Implement the client getter::
@oauth.clientgetter
def get_client(client_key):
client = get_client_model(client_key)
# Client is an object
return client
"""
self._clientgetter = f
return f
def confirm_authorization_request(self):
"""When consumer confirm the authrozation."""
server = self.server
uri, http_method, body, headers = extract_params()
try:
realms, credentials = server.get_realms_and_credentials(
uri, http_method=http_method, body=body, headers=headers
)
ret = server.create_authorization_response(
uri, http_method, body, headers, realms, credentials
)
log.debug('Authorization successful.')
return create_response(*ret)
except errors.OAuth1Error as e:
return redirect(e.in_uri(self.error_uri))
except errors.InvalidClientError as e:
return redirect(e.in_uri(self.error_uri))
def new(request):
if request.method == 'POST':
name = request.form.get('name', '').strip().lower()
age = request.form.get('age', '').strip()
if name:
is_uniq = await User.is_unique(doc=dict(name=name))
if is_uniq in (True, None):
await User.insert_one(dict(name=name, age=int(age)))
request['flash']('User was added successfully', 'success')
return redirect(app.url_for('index'))
else:
request['flash']('This name was already taken', 'error')
request['flash']('User name is required', 'error')
return jinja.render('form.html', request, user={})
def chapter(request):
"""
?????????
: content_url ?????U??url?????
: url ??????url
: novels_name ????
:return: ???????
"""
url = request.args.get('url', None)
novels_name = request.args.get('novels_name', None)
netloc = get_netloc(url)
if netloc not in RULES.keys():
return redirect(url)
if netloc in REPLACE_RULES.keys():
url = url.replace(REPLACE_RULES[netloc]['old'], REPLACE_RULES[netloc]['new'])
content_url = RULES[netloc].content_url
content = await cache_owllook_novels_chapter(url=url, netloc=netloc)
if content:
content = str(content).strip('[],, Jjs').replace(', ', '').replace('onerror', '').replace('js', '').replace(
'????', '')
return template(
'chapter.html', novels_name=novels_name, url=url, content_url=content_url, soup=content)
else:
return text('?????????????????????????????????{url}'.format(url=url))
def owllook_register(request):
"""
????
:param request:
:return:
: -1 ??????????
: 0 ????????
: 1 ????
"""
user = request['session'].get('user', None)
if user:
return redirect('/')
else:
ver_que_ans = ver_question()
if ver_que_ans:
request['session']['index'] = ver_que_ans
return template(
'register.html',
title='owllook - ?? - ????????',
question=ver_que_ans[1]
)
else:
return redirect('/')
def noti_book(request):
user = request['session'].get('user', None)
if user:
try:
motor_db = motor_base.get_db()
is_author = 0
data = await motor_db.user_message.find_one({'user': user}, {'author_latest': 1, '_id': 0})
if data:
is_author = 1
author_list = data.get('author_latest', {})
return template('noti_book.html', title='???? - owllook'.format(user=user),
is_login=1,
is_author=is_author,
author_list=author_list,
user=user)
else:
return template('noti_book.html', title='???? - owllook'.format(user=user),
is_login=1,
is_author=is_author,
user=user)
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def admin_setting(request):
user = request['session'].get('user', None)
if user:
try:
motor_db = motor_base.get_db()
data = await motor_db.user.find_one({'user': user})
if data:
return template('admin_setting.html', title='{user}??? - owllook'.format(user=user),
is_login=1,
user=user,
register_time=data['register_time'],
email=data.get('email', '???????'))
else:
return text('????')
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def login_required(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
for arg in args:
if isinstance(arg, Request):
uid = arg.get('session', {}).get('uid')
if uid is None:
return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
try:
user = await User.objects.get(User.id == uid)
arg['uid'] = uid
arg['user'] = user
except User.DoesNotExist:
return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
break
return await f(*args, **kwargs)
return decorated_function
def admin_required(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
for arg in args:
if isinstance(arg, Request):
uid = arg.get('session', {}).get('uid')
if uid is None:
return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
try:
user = await User.objects.get(User.id == uid, User.is_admin == 1)
arg['uid'] = uid
arg['user'] = user
except User.DoesNotExist:
return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
break
return await f(*args, **kwargs)
return decorated_function
def inc(request):
request['session']['counter'] += 1
return response.redirect('/')
def dec(request):
request['session']['counter'] -= 1
return response.redirect('/')
def requires_login(fn):
async def check_login(request, *args, **kwargs):
session_cookie = request.cookies.get('session')
if session_cookie:
async with app.pool.acquire() as con:
admin = await con.fetchrow("SELECT admins.toshi_id FROM admins "
"JOIN sessions ON admins.toshi_id = sessions.toshi_id "
"WHERE sessions.session_id = $1",
session_cookie)
if admin:
url = '{}/v1/user/{}'.format(ID_SERVICE_LOGIN_URL, admin['toshi_id'])
resp = await app.http.get(url)
if resp.status == 200:
admin = await resp.json()
if admin['custom']['avatar'].startswith('/'):
admin['custom']['avatar'] = "{}{}".format(ID_SERVICE_LOGIN_URL, admin['custom']['avatar'])
else:
admin = None
else:
admin = None
if not admin:
return redirect("/login?redirect={}".format(request.path))
# keep the config object as the first argument
if len(args) and isinstance(args[0], Config):
args = (args[0], admin, *args[1:])
else:
args = (admin, *args)
rval = await fn(request, *args, **kwargs)
return rval
return check_login
def index(request, user):
return redirect("/mainnet")
def post_logout(request):
session_cookie = request.cookies.get('session')
if session_cookie:
async with app.pool.acquire() as con:
await con.execute("DELETE FROM sessions "
"WHERE sessions.session_id = $1",
session_cookie)
del request.cookies['session']
return redirect("/login")
def post_admin_add_remove(request, current_user, action):
if 'toshi_id' in request.form:
toshi_id = request.form.get('toshi_id')
if not toshi_id:
SanicException("Bad Arguments", status_code=400)
elif 'username' in request.form:
username = request.form.get('username')
if not username:
raise SanicException("Bad Arguments", status_code=400)
if username[0] == '@':
username = username[1:]
if not username:
raise SanicException("Bad Arguments", status_code=400)
async with app.configs['mainnet'].db.id.acquire() as con:
user = await con.fetchrow("SELECT * FROM users WHERE username = $1", username)
if user is None and username.startswith("0x"):
user = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", username)
if user is None:
raise SanicException("User not found", status_code=400)
toshi_id = user['toshi_id']
else:
SanicException("Bad Arguments", status_code=400)
if action == 'add':
print('adding admin: {}'.format(toshi_id))
async with app.pool.acquire() as con:
await con.execute("INSERT INTO admins VALUES ($1) ON CONFLICT DO NOTHING", toshi_id)
elif action == 'remove':
print('removing admin: {}'.format(toshi_id))
async with app.pool.acquire() as con:
await con.execute("DELETE FROM admins WHERE toshi_id = $1", toshi_id)
await con.execute("DELETE FROM sessions WHERE toshi_id = $1", toshi_id)
else:
raise SanicException("Not Found", status_code=404)
if 'Referer' in request.headers:
return redirect(request.headers['Referer'])
return redirect("/config")
def delete_dapp(request, conf, current_user, dapp_id):
async with conf.db.id.acquire() as con:
await con.execute("DELETE FROM dapps WHERE dapp_id = $1", int(dapp_id))
if 'Referer' in request.headers:
return redirect(request.headers['Referer'])
return redirect("/{}/dapps".format(conf.name))
def feature_app_handler_post(request, conf, current_user):
print(request.form)
toshi_id = request.form.get('toshi_id')
featured = request.form.get('featured', False)
if toshi_id is not None:
async with conf.db.id.acquire() as con:
await con.execute("UPDATE users SET featured = $2 WHERE toshi_id = $1", toshi_id, True if featured else False)
if 'Referer' in request.headers:
return redirect(request.headers['Referer'])
return redirect("/{}/user/{}".format(conf.name, toshi_id))
return redirect("/{}/apps".format(conf.name))
def blocked_app_handler_post(request, conf, current_user):
toshi_id = request.form.get('toshi_id')
blocked = request.form.get('blocked', False)
if toshi_id is not None:
async with conf.db.id.acquire() as con:
async with con.transaction():
await con.execute("UPDATE users SET blocked = $2 WHERE toshi_id = $1", toshi_id, True if blocked else False)
if 'Referer' in request.headers:
return redirect(request.headers['Referer'])
return redirect("/{}/user/{}".format(conf.name, toshi_id))
return redirect("/{}/apps".format(conf.name))
def login_redirect(request):
"""Convenience method to return a redirect with "next" populated.
:param request: Sanic request.
:return: Redirect response.
"""
return redirect('{0}?next={1}'.format(settings.AUTH_CONFIG['login_url'], request.path))
def error_uri(self):
"""The error page URI.
When something turns error, it will redirect to this error page.
You can configure the error page URI with Flask config::
OAUTH1_PROVIDER_ERROR_URI = '/error'
You can also define the error page by a named endpoint::
OAUTH1_PROVIDER_ERROR_ENDPOINT = 'oauth.error'
"""
error_uri = self.app.config.get('OAUTH1_PROVIDER_ERROR_URI')
if error_uri:
return error_uri
error_endpoint = self.app.config.get('OAUTH1_PROVIDER_ERROR_ENDPOINT')
if error_endpoint:
return url_for(error_endpoint)
return '/oauth/errors'