def index(request):
form = FeedbackForm(request)
if request.method == 'POST' and form.validate():
note = form.note.data
msg = '{} - {}'.format(datetime.now(), note)
session.setdefault('fb', []).append(msg)
return response.redirect('/')
# NOTE: trusting user input here, never do that in production
feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', []))
# Ah, f string, so, python 3.6, what do you expect from someone brave
# enough to use sanic... :)
content = f"""
<h1>Form with CSRF Validation</h1>
<p>Try <a href="/fail">form</a> that fails CSRF validation</p>
{feedback}
<form action="" method="POST">
{'<br>'.join(form.csrf_token.errors)}
{form.csrf_token}
{'<br>'.join(form.note.errors)}
<br>
{form.note(size=40, placeholder="say something..")}
{form.submit}
</form>
"""
return response.html(content)
python类html()的实例源码
def fail(request):
form = FeedbackForm(request)
if request.method == 'POST' and form.validate():
note = form.note.data
msg = '{} - {}'.format(datetime.now(), note)
session.setdefault('fb', []).append(msg)
return response.redirect('/fail')
feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', []))
content = f"""
<h1>Form which fails CSRF Validation</h1>
<p>This is the same as this <a href="/">form</a> except that CSRF
validation always fail because we did not render the hidden csrf token</p>
{feedback}
<form action="" method="POST">
{'<br>'.join(form.csrf_token.errors)}
{'<br>'.join(form.note.errors)}
<br>
{form.note(size=40, placeholder="say something..")}
{form.submit}
</form>
"""
return response.html(content)
def test_file_upload(app):
app.config['WTF_CSRF_ENABLED'] = False
class TestForm(SanicForm):
upload = FileField('upload file')
submit = SubmitField('Upload')
@app.route('/upload', methods=['GET', 'POST'])
async def upload(request):
form = TestForm(request)
if form.validate_on_submit():
return response.text(form.upload.data.name)
content = render_form(form)
return response.html(content)
req, resp = app.test_client.post(
'/upload', data={'upload': open(__file__, 'rb')})
assert resp.status == 200
assert resp.text == os.path.basename(__file__)
def login(request):
message = ''
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
# for demonstration purpose only, you should use more robust method
if username == 'demo' and password == '1234':
# use User proxy in sanic_auth, this should be some ORM model
# object in production, the default implementation of
# auth.login_user expects User.id and User.name available
user = User(id=1, name=username)
auth.login_user(request, user)
return response.redirect('/')
message = 'invalid username or password'
return response.html(LOGIN_FORM.format(message))
def render_map():
css_js = ''
if conf.LOAD_CUSTOM_CSS_FILE:
css_js = '<link rel="stylesheet" href="static/css/custom.css">'
if conf.LOAD_CUSTOM_JS_FILE:
css_js += '<script type="text/javascript" src="static/js/custom.js"></script>'
js_vars = Markup(
"_defaultSettings['FIXED_OPACITY'] = '{:d}'; "
"_defaultSettings['SHOW_TIMER'] = '{:d}'; "
"_defaultSettings['TRASH_IDS'] = [{}]; ".format(conf.FIXED_OPACITY, conf.SHOW_TIMER, ', '.join(str(p_id) for p_id in conf.TRASH_IDS)))
template = env.get_template('custom.html' if conf.LOAD_CUSTOM_HTML_FILE else 'newmap.html')
return html(template.render(
area_name=conf.AREA_NAME,
map_center=center,
map_provider_url=conf.MAP_PROVIDER_URL,
map_provider_attribution=conf.MAP_PROVIDER_ATTRIBUTION,
social_links=social_links(),
init_js_vars=js_vars,
extra_css_js=Markup(css_js)
))
def index(request):
form = UploadForm(request)
if form.validate_on_submit():
image = form.image.data
# NOTE: trusting user submitted file names here, the name should be
# sanitized in production.
uploaded_file = Path(request.app.config.UPLOAD_DIR) / image.name
uploaded_file.write_bytes(image.body)
description = form.description.data or 'no description'
session.setdefault('files', []).append((image.name, description))
return response.redirect('/')
img = '<section><img src="/img/{}"><p>{}</p><hr></section>'
images = ''.join(img.format(i, d) for i, d in session.get('files', []))
content = f"""
<h1>Sanic-WTF file field validators example</h1>
{images}
<form action="" method="POST" enctype="multipart/form-data">
{'<br>'.join(form.csrf_token.errors)}
{form.csrf_token}
{'<br>'.join(form.image.errors)}
{'<br>'.join(form.description.errors)}
<br> {form.image.label}
<br> {form.image}
<br> {form.description.label}
<br> {form.description(size=20, placeholder="description")}
<br> {form.submit}
</form>
"""
return response.html(content)
def test_form_validation(app):
app.config['WTF_CSRF_ENABLED'] = False
class TestForm(SanicForm):
msg = StringField('Note', validators=[DataRequired(), Length(max=10)])
submit = SubmitField('Submit')
@app.route('/', methods=['GET', 'POST'])
async def index(request):
form = TestForm(request)
if request.method == 'POST' and form.validate():
return response.text('validated')
content = render_form(form)
return response.html(content)
req, resp = app.test_client.get('/')
assert resp.status == 200
# we disabled it
assert 'csrf_token' not in resp.text
# this is longer than 10
payload = {'msg': 'love is beautiful'}
req, resp = app.test_client.post('/', data=payload)
assert resp.status == 200
assert 'validated' not in resp.text
payload = {'msg': 'happy'}
req, resp = app.test_client.post('/', data=payload)
assert resp.status == 200
assert 'validated' in resp.text
def test_form_csrf_validation(app):
app.config['WTF_CSRF_SECRET_KEY'] = 'top secret !!!'
class TestForm(SanicForm):
msg = StringField('Note', validators=[DataRequired(), Length(max=10)])
submit = SubmitField('Submit')
@app.route('/', methods=['GET', 'POST'])
async def index(request):
form = TestForm(request)
if request.method == 'POST' and form.validate():
return response.text('validated')
content = render_form(form)
return response.html(content)
req, resp = app.test_client.get('/')
assert resp.status == 200
assert 'csrf_token' in resp.text
token = re.findall(csrf_token_pattern, resp.text)[0]
assert token
payload = {'msg': 'happy', 'csrf_token': token}
req, resp = app.test_client.post('/', data=payload)
assert resp.status == 200
assert 'validated' in resp.text
payload = {'msg': 'happy'}
req, resp = app.test_client.post('/', data=payload)
assert resp.status == 200
# should fail, no CSRF token in payload
assert 'validated' not in resp.text
def test_validate_on_submit(app):
app.config['WTF_CSRF_SECRET_KEY'] = 'top secret !!!'
class TestForm(SanicForm):
msg = StringField('Note', validators=[DataRequired(), Length(max=10)])
submit = SubmitField('Submit')
@app.route('/', methods=['GET', 'POST'])
async def index(request):
form = TestForm(request)
if form.validate_on_submit():
return response.text('validated')
content = render_form(form)
return response.html(content)
req, resp = app.test_client.get('/')
assert resp.status == 200
assert 'csrf_token' in resp.text
token = re.findall(csrf_token_pattern, resp.text)[0]
assert token
payload = {'msg': 'happy', 'csrf_token': token}
req, resp = app.test_client.post('/', data=payload)
assert resp.status == 200
assert 'validated' in resp.text
def profile(request, user):
content = '<a href="/logout">Logout</a><p>Welcome, %s</p>' % user.name
return response.html(content)
helper.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def start(self):
"""Start sanic web server."""
self.app = Sanic('sanic_server')
# GZip support
# Compress(self.app)
# self.app.config['COMPRESS_MIMETYPES'] = {'text/html',
# 'application/json'}
# self.app.config['COMPRESS_LEVEL'] = 4
# self.app.config['COMPRESS_MIN_SIZE'] = 300
# Session support
self.session_interface = InMemorySessionInterface()
self.app.response_middleware.appendleft(self.save_session)
self.app.request_middleware.append(self.add_session_to_request)
self.add_routes()
return await self.app.create_server(loop=self.loop,
host='0.0.0.0',
port=self.port,
debug=False)
saniconeconnect.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def async_good(self, request):
"""TBD."""
data = {
"title": "Top of the best stikers for Telegram",
"active_good": "class=\"active\"",
"active_bad": "",
"top": {}
}
await self.session_handler(request=request)
data["top"] = await self.db.get_top(9, 'ITERATOR_LE')
return self.jinja.render('good.html', request,
title=data['title'],
active_good=data['active_good'],
active_bad=data['active_bad'],
top=data['top'],
)
saniconeconnect.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def action_bad(self, request):
"""TBD."""
data = {
"title": "Top of bad stikers for Telegram",
"active_good": "",
"active_bad": "class=\"active\"",
"top": {}
}
await self.session_handler(request=request)
data['top'] = await self.db.get_top(9, 'ITERATOR_GE')
return self.jinja.render('bad.html', request,
title=data['title'],
active_good=data['active_good'],
active_bad=data['active_bad'],
top=data['top'],
)
def render_graphiql(jinja_env=None, graphiql_version=None, graphiql_template=None, params=None, result=None):
graphiql_version = graphiql_version or GRAPHIQL_VERSION
template = graphiql_template or TEMPLATE
template_vars = {
'graphiql_version': graphiql_version,
'query': params and params.query,
'variables': params and params.variables,
'operation_name': params and params.operation_name,
'result': result,
}
if jinja_env:
template = jinja_env.from_string(template)
if jinja_env.is_async:
source = await template.render_async(**template_vars)
else:
source = template.render(**template_vars)
else:
source = simple_renderer(template, **template_vars)
return html(source)
def chapter(request):
"""
?????????
: content_url ?????U??url?????
: url ??????url
: novels_name ????
:return: ???????
"""
url = request.args.get('url', None)
novels_name = request.args.get('novels_name', None)
netloc = get_netloc(url)
if netloc not in RULES.keys():
return redirect(url)
if netloc in REPLACE_RULES.keys():
url = url.replace(REPLACE_RULES[netloc]['old'], REPLACE_RULES[netloc]['new'])
content_url = RULES[netloc].content_url
content = await cache_owllook_novels_chapter(url=url, netloc=netloc)
if content:
content = str(content).strip('[],, Jjs').replace(', ', '').replace('onerror', '').replace('js', '').replace(
'????', '')
return template(
'chapter.html', novels_name=novels_name, url=url, content_url=content_url, soup=content)
else:
return text('?????????????????????????????????{url}'.format(url=url))
def owllook_register(request):
"""
????
:param request:
:return:
: -1 ??????????
: 0 ????????
: 1 ????
"""
user = request['session'].get('user', None)
if user:
return redirect('/')
else:
ver_que_ans = ver_question()
if ver_que_ans:
request['session']['index'] = ver_que_ans
return template(
'register.html',
title='owllook - ?? - ????????',
question=ver_que_ans[1]
)
else:
return redirect('/')
def admin_setting(request):
user = request['session'].get('user', None)
if user:
try:
motor_db = motor_base.get_db()
data = await motor_db.user.find_one({'user': user})
if data:
return template('admin_setting.html', title='{user}??? - owllook'.format(user=user),
is_login=1,
user=user,
register_time=data['register_time'],
email=data.get('email', '???????'))
else:
return text('????')
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def default(self, request, exception):
self.log(format_exc())
if issubclass(type(exception), SanicException):
return text(
'Error: {}'.format(exception),
status=getattr(exception, 'status_code', 500),
headers=getattr(exception, 'headers', dict())
)
elif self.debug:
html_output = self._render_traceback_html(exception, request)
response_message = ('Exception occurred while handling uri: '
'"%s"\n%s')
logger.error(response_message, request.url, format_exc())
return html(html_output, status=500)
else:
return html(INTERNAL_SERVER_ERROR_HTML, status=500)
def default(self, request, exception):
self.log(format_exc())
if issubclass(type(exception), SanicException):
return text(
'Error: {}'.format(exception),
status=getattr(exception, 'status_code', 500),
headers=getattr(exception, 'headers', dict())
)
elif self.debug:
html_output = self._render_traceback_html(exception, request)
response_message = (
'Exception occurred while handling uri: "{}"\n{}'.format(
request.url, format_exc()))
log.error(response_message)
return html(html_output, status=500)
else:
return html(INTERNAL_SERVER_ERROR_HTML, status=500)
def render(session):
tpl = """<a href="/inc">+</a> {} <a href="/dec">-</a>"""
return response.html(tpl.format(session['counter']))
def _save_to_cache(key: str, data: bytes, format: str = 'html') -> None:
try:
cache.set(key, data, CACHE_LIVE_TIME, format)
except Exception:
logger.exception('Error writing cache')
if sentry:
sentry.captureException()
def _render(prerender: Prerender, url: str, format: str = 'html') -> str:
'''Retry once after TemporaryBrowserFailure occurred.'''
for i in range(2):
try:
return await prerender.render(url, format)
except (TemporaryBrowserFailure, asyncio.TimeoutError) as e:
if i < 1:
logger.warning('Temporary browser failure: %s, retry rendering %s in 1s', str(e), url)
await asyncio.sleep(1)
continue
raise
def get_login(request):
return html(await env.get_template("login.html").render_async())
def get_config_home(request, current_user):
# get list of admins
async with app.pool.acquire() as con:
admins = await con.fetch("SELECT * FROM admins")
users = []
for admin in admins:
async with app.configs['mainnet'].db.id.acquire() as con:
user = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", admin['toshi_id'])
if user is None:
user = {'toshi_id': admin['toshi_id']}
users.append(fix_avatar_for_user(app.configs['mainnet'].urls.id, dict(user)))
return html(await env.get_template("config.html").render_async(
admins=users,
current_user=current_user, environment='config', page="home"))
def get_categories(request, conf, current_user):
language = 'en'
sql = ("SELECT * FROM categories "
"JOIN category_names ON categories.category_id = category_names.category_id AND category_names.language = $1"
"ORDER BY categories.category_id DESC ")
async with conf.db.id.acquire() as con:
rows = await con.fetch(sql, language)
return html(await env.get_template("categories.html").render_async(
categories=rows, current_user=current_user, environment=conf.name, page="categories"))
def test(request):
data = request.json
return html(template.render(**data)) # ??????
def render_worker_map():
template = env.get_template('workersmap.html')
return html(template.render(
area_name=conf.AREA_NAME,
map_center=center,
map_provider_url=conf.MAP_PROVIDER_URL,
map_provider_attribution=conf.MAP_PROVIDER_ATTRIBUTION,
social_links=social_links()
))
helper.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def handle(self, request, name):
"""Simple handler that answer http request get with port and name."""
text = 'Sanic server running on {0} port. Hello, {1}'.format(
str(self.port), str(name))
return html(text)
helper.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def handle_index(self, request):
"""Simple handler that answer http request get with port and name."""
text = 'Sanic server running on {0} port.'.format(str(self.port))
return html(text)
saniconeconnect.py 文件源码
项目:python-tarantool-benchmark-and-bootstrap
作者: valentinmk
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def action_about(self, request):
"""TBD."""
data = {"title": "About and statistics"}
data.update(self.statistics)
return self.jinja.render('about.html', request,
title=data['title'],
packs_count=data['packs_count'],
stickers_count=data['stickers_count'],
clicks=data['clicks'],
votes=data['votes'],
users=data['users'])