def authorize_handler(self, f):
"""Authorization handler decorator.
This decorator will sort the parameters and headers out, and
pre validate everything::
@app.route('/oauth/authorize', methods=['GET', 'POST'])
@oauth.authorize_handler
def authorize(*args, **kwargs):
if request.method == 'GET':
# render a page for user to confirm the authorization
return render_template('oauthorize.html')
confirm = request.form.get('confirm', 'no')
return confirm == 'yes'
"""
@wraps(f)
def decorated(*args, **kwargs):
if request.method == 'POST':
if not f(*args, **kwargs):
uri = add_params_to_uri(
self.error_uri, [('error', 'denied')]
)
return redirect(uri)
return self.confirm_authorization_request()
server = self.server
uri, http_method, body, headers = extract_params()
try:
realms, credentials = server.get_realms_and_credentials(
uri, http_method=http_method, body=body, headers=headers
)
kwargs['realms'] = realms
kwargs.update(credentials)
return f(*args, **kwargs)
except errors.OAuth1Error as e:
return redirect(e.in_uri(self.error_uri))
except errors.InvalidClientError as e:
return redirect(e.in_uri(self.error_uri))
return decorated
python类redirect()的实例源码
def get_redirect_uri(self, token, request):
"""Redirect uri for this request token."""
log.debug('Get redirect uri of %r', token)
tok = request.request_token or self._grantgetter(token=token)
return tok.redirect_uri
def edit(request, id):
user = await User.find_one(id)
if not user:
request['flash']('User not found', 'error')
return redirect(app.url_for('index'))
if request.method == 'POST':
name = request.form.get('name', '').strip().lower()
age = request.form.get('age', '').strip()
if name:
doc = dict(name=name, age=int(age))
is_uniq = await User.is_unique(doc=doc, id=user.id)
if is_uniq in (True, None):
# remove non-changed items
user.clean_for_dirty(doc)
if doc:
await User.update_one({'_id': user.id}, {'$set': doc})
request['flash']('User was updated successfully', 'success')
return redirect(app.url_for('index'))
else:
request['flash']('This name was already taken', 'error')
request['flash']('User name is required', 'error')
return jinja.render('form.html', request, user=user)
def destroy(request, id):
user = await User.find_one(id)
if not user:
request['flash']('User not found', 'error')
return redirect(app.url_for('index'))
await user.destroy()
request['flash']('User was deleted successfully', 'success')
return redirect(app.url_for('index'))
def similar_user(request):
user = request['session'].get('user', None)
if user:
try:
motor_db = motor_base.get_db()
similar_info = await motor_db.user_recommend.find_one({'user': user})
if similar_info:
similar_user = similar_info['similar_user'][:20]
user_tag = similar_info['user_tag']
updated_at = similar_info['updated_at']
return template('similar_user.html',
title='?' + user + '?????',
is_login=1,
is_similar=1,
user=user,
similar_user=similar_user,
user_tag=user_tag,
updated_at=updated_at)
else:
return template('similar_user.html',
title='?' + user + '?????',
is_login=1,
is_similar=0,
user=user)
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def book_list(request):
user = request['session'].get('user', None)
if user:
try:
return template('admin_book_list.html', title='{user}??? - owllook'.format(user=user),
is_login=1,
user=user)
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def similar_user(request):
user = request['session'].get('user', None)
if user:
try:
motor_db = motor_base.get_db()
similar_info = await motor_db.user_recommend.find_one({'user': user})
if similar_info:
similar_user = similar_info['similar_user'][:20]
user_tag = similar_info['user_tag']
updated_at = similar_info['updated_at']
return template('similar_user.html',
title='?' + user + '?????',
is_login=1,
is_similar=1,
user=user,
similar_user=similar_user,
user_tag=user_tag,
updated_at=updated_at)
else:
return template('similar_user.html',
title='?' + user + '?????',
is_login=1,
is_similar=0,
user=user)
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def add_session_to_request(request):
# before each request initialize a session
# using the client's request
host = request.headers.get('host', None)
user_agent = request.headers.get('user-agent', None)
if user_agent:
if CONFIG.VAL_HOST == 'true':
if not host or host not in CONFIG.HOST:
return redirect('http://www.owllook.net')
if CONFIG.WEBSITE['IS_RUNNING']:
await app.session_interface.open(request)
else:
return html("<h3>??????...</h3>")
else:
return html("<h3>??????...</h3>")
def handle_request(request):
return response.redirect('/redirect')
def index(request):
# generate a URL for the endpoint `post_handler`
url = app.url_for('post_handler', post_id=5)
# the URL is `/posts/5`, redirect to it
return response.redirect(url)
def redirect_app():
app = Sanic('test_redirection')
@app.route('/redirect_init')
async def redirect_init(request):
return redirect("/redirect_target")
@app.route('/redirect_init_with_301')
async def redirect_init_with_301(request):
return redirect("/redirect_target", status=301)
@app.route('/redirect_target')
async def redirect_target(request):
return text('OK')
@app.route('/1')
def handler(request):
return redirect('/2')
@app.route('/2')
def handler(request):
return redirect('/3')
@app.route('/3')
def handler(request):
return text('OK')
return app
def challenge(req, token):
if await challenges.is_frozen(req.ip):
logger.debug(f"Challenge is {token} but the account is frozen.")
raise InvalidUsage("Account frozen")
if not await challenges.solve(token):
logger.debug(f"Challenge {token} is invalid.")
unfreeze = await challenges.freeze(req.ip)
raise InvalidUsage("Invalid token. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds"))
await challenges.unfreeze(req.ip)
logger.info(f"Challenge {token} validated")
return redirect(req.app.url_for("index"))
def index(request):
user = request['user']
if user:
return response.redirect(app.url_for('user_panel.index'))
else:
return response.redirect(app.url_for('auth.LoginView'))
def logout(request):
if 'uid' in request['session']:
request['session'].pop('uid')
return redirect(app.url_for('home.index'))
def index(request: Request):
return redirect('/index.html')
# ????
# @app.middleware('response')
# async def access_control_all_origin(request: Request, response):
# response.headers['Access-Control-Allow-Origin'] = '*'
def index(request: Request):
return redirect('https://c-w.github.io/gutenberg-http/')
def migrate_users(request, current_user):
from_env = request.form.get('from', None)
to_env = request.form.get('to', None)
toshi_ids = request.form.get('toshi_ids', None)
apps_flag = request.form.get('apps', None)
users_flag = request.form.get('users', None)
limit = 1000
offset = 1000 * 1
if toshi_ids:
toshi_ids = set(re.findall("0x[a-fA-f0-9]{40}", toshi_ids))
else:
toshi_ids = set()
print("MIGRATING USERS FROM '{}' TO '{}'".format(from_env, to_env))
async with app.configs[from_env].db.id.acquire() as con:
if apps_flag == 'on':
users = await con.fetch("SELECT * FROM users WHERE is_app = TRUE")
print("APPS", len(users))
user_rows = list(users)
else:
user_rows = []
if users_flag == 'on':
users = await con.fetch("SELECT * FROM users WHERE is_app = FALSE OFFSET $1 LIMIT $2", offset, limit)
print("USERS", len(users))
user_rows.extend(list(users))
if len(toshi_ids) > 0:
users = await con.fetch("SELECT * FROM users WHERE toshi_id = ANY($1)", toshi_ids)
user_rows.extend(list(users))
for row in user_rows:
toshi_ids.add(row['toshi_id'])
avatar_rows = await con.fetch("SELECT * FROM avatars WHERE toshi_id = ANY($1)", toshi_ids)
users = []
avatars = []
for row in user_rows:
users.append((row['toshi_id'], row['payment_address'], row['created'], row['updated'], row['username'], row['name'], row['avatar'], row['about'], row['location'], row['is_public'], row['went_public'], row['is_app'], row['featured']))
for row in avatar_rows:
avatars.append((row['toshi_id'], row['img'], row['hash'], row['format'], row['last_modified']))
print("INSERTING {} USERS".format(len(toshi_ids)))
async with app.configs[to_env].db.id.acquire() as con:
rval = await con.executemany(
"INSERT INTO users ("
"toshi_id, payment_address, created, updated, username, name, avatar, about, location, is_public, went_public, is_app, featured"
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) "
"ON CONFLICT DO NOTHING",
users)
print("MIGRATED USERS: {} (of {})".format(rval, len(toshi_ids)))
rval = await con.executemany(
"INSERT INTO avatars ("
"toshi_id, img, hash, format, last_modified"
") VALUES ($1, $2, $3, $4, $5) "
"ON CONFLICT DO NOTHING",
avatars)
print("MIGRATED AVATARS: {} (of {})".format(rval, len(avatars)))
return redirect(request.headers['Referer'] or "/config")
def qidian(request):
user = request['session'].get('user', None)
novels_type = request.args.get('type', '????').strip()
first_type_title = "????"
first_type = [
'??',
'??',
'??',
'??',
'??',
'??',
'??',
'??',
'??',
'??',
'??',
'??',
'???',
]
if novels_type in first_type:
novels_head = [novels_type]
elif novels_type == first_type_title:
novels_head = ['#']
else:
return redirect('qidian')
search_ranking = await cache_others_search_ranking(spider='qidian', novel_type=novels_type)
title = "owllook - ??????"
if user:
return template('index.html',
title=title,
is_login=1,
is_qidian=1,
user=user,
search_ranking=search_ranking,
first_type=first_type,
first_type_title=first_type_title,
novels_head=novels_head)
else:
return template('index.html',
title=title,
is_login=0,
is_qidian=1,
search_ranking=search_ranking,
first_type=first_type,
first_type_title=first_type_title,
novels_head=novels_head)
def books(request):
user = request['session'].get('user', None)
if user:
try:
motor_db = motor_base.get_db()
data = await motor_db.user_message.find_one({'user': user})
if data:
books_url = data.get('books_url', None)
if books_url:
result = []
for i in books_url:
item_result = {}
book_url = i.get('book_url', None)
last_read_url = i.get("last_read_url", "")
book_query = parse_qs(urlparse(book_url).query)
last_read_chapter_name = parse_qs(
last_read_url).get('name', ['??'])[0]
item_result['novels_name'] = book_query.get('novels_name', '')[0] if book_query.get(
'novels_name', '') else ''
item_result['book_url'] = book_url
latest_data = await motor_db.latest_chapter.find_one({'owllook_chapter_url': book_url})
if latest_data:
item_result['latest_chapter_name'] = latest_data['data']['latest_chapter_name']
item_result['owllook_content_url'] = latest_data['data']['owllook_content_url']
else:
get_latest_data = await get_the_latest_chapter(book_url) or {}
item_result['latest_chapter_name'] = get_latest_data.get(
'latest_chapter_name', '????????')
item_result['owllook_content_url'] = get_latest_data.get(
'owllook_content_url', '')
item_result['add_time'] = i.get('add_time', '')
item_result["last_read_url"] = last_read_url if last_read_url else book_url
item_result["last_read_chapter_name"] = last_read_chapter_name
result.append(item_result)
return template('admin_books.html', title='{user}??? - owllook'.format(user=user),
is_login=1,
user=user,
is_bookmark=1,
result=result[::-1])
return template('admin_books.html', title='{user}??? - owllook'.format(user=user),
is_login=1,
user=user,
is_bookmark=0)
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')
def search_user(request):
user = request['session'].get('user', None)
name = request.args.get('ss', None)
if user and name:
try:
motor_db = motor_base.get_db()
data = await motor_db.user_message.find_one({'user': name})
books_url = data.get('books_url', None) if data else None
if books_url:
result = []
for i in books_url:
item_result = {}
book_url = i.get('book_url', None)
last_read_url = i.get("last_read_url", "")
book_query = parse_qs(urlparse(book_url).query)
last_read_chapter_name = parse_qs(last_read_url).get('name', ['??'])[0]
item_result['novels_name'] = book_query.get('novels_name', '')[0] if book_query.get(
'novels_name', '') else ''
item_result['book_url'] = book_url
latest_data = await motor_db.latest_chapter.find_one({'owllook_chapter_url': book_url})
if latest_data:
item_result['latest_chapter_name'] = latest_data['data']['latest_chapter_name']
item_result['owllook_content_url'] = latest_data['data']['owllook_content_url']
else:
get_latest_data = await get_the_latest_chapter(book_url) or {}
item_result['latest_chapter_name'] = get_latest_data.get('latest_chapter_name', '????????')
item_result['owllook_content_url'] = get_latest_data.get('owllook_content_url', '')
item_result['add_time'] = i.get('add_time', '')
item_result["last_read_url"] = last_read_url if last_read_url else book_url
item_result["last_read_chapter_name"] = last_read_chapter_name
result.append(item_result)
return template('search_user.html', title='{name}??? - owllook'.format(name=name),
is_login=1,
user=user,
username=name,
is_bookmark=1,
result=result[::-1])
else:
return template('search_user.html', title='{name}??? - owllook'.format(name=name),
is_login=1,
user=user,
is_bookmark=0)
except Exception as e:
LOGGER.error(e)
return redirect('/')
else:
return redirect('/')