python类redirect()的实例源码

guestbook.py 文件源码 项目:sanic-wtf 作者: pyx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def index(request):
    form = FeedbackForm(request)
    if request.method == 'POST' and form.validate():
        note = form.note.data
        msg = '{} - {}'.format(datetime.now(), note)
        session.setdefault('fb', []).append(msg)
        return response.redirect('/')
    # NOTE: trusting user input here, never do that in production
    feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', []))
    # Ah, f string, so, python 3.6, what do you expect from someone brave
    # enough to use sanic... :)
    content = f"""
    <h1>Form with CSRF Validation</h1>
    <p>Try <a href="/fail">form</a> that fails CSRF validation</p>
    {feedback}
    <form action="" method="POST">
      {'<br>'.join(form.csrf_token.errors)}
      {form.csrf_token}
      {'<br>'.join(form.note.errors)}
      <br>
      {form.note(size=40, placeholder="say something..")}
      {form.submit}
    </form>
    """
    return response.html(content)
guestbook.py 文件源码 项目:sanic-wtf 作者: pyx 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def fail(request):
    form = FeedbackForm(request)
    if request.method == 'POST' and form.validate():
        note = form.note.data
        msg = '{} - {}'.format(datetime.now(), note)
        session.setdefault('fb', []).append(msg)
        return response.redirect('/fail')
    feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', []))
    content = f"""
    <h1>Form which fails CSRF Validation</h1>
    <p>This is the same as this <a href="/">form</a> except that CSRF
    validation always fail because we did not render the hidden csrf token</p>
    {feedback}
    <form action="" method="POST">
      {'<br>'.join(form.csrf_token.errors)}
      {'<br>'.join(form.note.errors)}
      <br>
      {form.note(size=40, placeholder="say something..")}
      {form.submit}
    </form>
    """
    return response.html(content)
note.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def login(request):
    message = ''
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        # for demonstration purpose only, you should use more robust method
        if username == 'demo' and password == '1234':
            # use User proxy in sanic_auth, this should be some ORM model
            # object in production, the default implementation of
            # auth.login_user expects User.id and User.name available
            user = User(id=1, name=username)
            auth.login_user(request, user)
            return response.redirect('/')
        message = 'invalid username or password'
    return response.html(LOGIN_FORM.format(message))
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def force_https(request):
    host = request.headers.get('Host', '')
    # get scheme, first by checking the x-forwarded-proto (from nginx/heroku etc)
    # then falling back on whether or not there is an sslcontext
    scheme = request.headers.get(
        'x-forwarded-proto',
        "https" if request.transport.get_extra_info('sslcontext') else "http")
    if not host.startswith("localhost:") and scheme != "https":
        url = urlunparse((
            "https",
            host,
            request.path,
            None,
            request.query_string,
            None))
        return redirect(url)
upload.py 文件源码 项目:sanic-wtf 作者: pyx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def index(request):
    form = UploadForm(request)
    if form.validate_on_submit():
        image = form.image.data
        # NOTE: trusting user submitted file names here, the name should be
        # sanitized in production.
        uploaded_file = Path(request.app.config.UPLOAD_DIR) / image.name
        uploaded_file.write_bytes(image.body)
        description = form.description.data or 'no description'
        session.setdefault('files', []).append((image.name, description))
        return response.redirect('/')
    img = '<section><img src="/img/{}"><p>{}</p><hr></section>'
    images = ''.join(img.format(i, d) for i, d in session.get('files', []))
    content = f"""
    <h1>Sanic-WTF file field validators example</h1>
    {images}
    <form action="" method="POST" enctype="multipart/form-data">
      {'<br>'.join(form.csrf_token.errors)}
      {form.csrf_token}
      {'<br>'.join(form.image.errors)}
      {'<br>'.join(form.description.errors)}
      <br> {form.image.label}
      <br> {form.image}
      <br> {form.description.label}
      <br> {form.description(size=20, placeholder="description")}
      <br> {form.submit}
    </form>
    """
    return response.html(content)
note.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def logout(request):
    auth.logout_user(request)
    return response.redirect('/login')
test_auth.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_logout(app):
    auth = Auth(app)

    @app.post('/login')
    async def login(request):
        name = request.form.get('name')
        password = request.form.get('password')
        if name == 'demo' and password == '1234':
            auth.login_user(request, User(id=1, name=name))
            return response.text('okay')
        return response.text('failed')

    @app.route('/logout')
    async def logout(request):
        auth.logout_user(request)
        return response.redirect('/user')

    @app.route('/user')
    async def user(request):
        user = auth.current_user(request)
        if user is not None:
            return response.text(user.name)
        return response.text('')

    payload = {'name': 'demo', 'password': '1234'}
    req, resp = app.test_client.post('/login', data=payload)
    assert resp.status == 200 and resp.text == 'okay'
    req, resp = app.test_client.get('/user')
    assert resp.status == 200 and resp.text == 'demo'

    req, reps = app.test_client.get('/logout')
    assert resp.status == 200 and resp.text == 'demo'
    req, resp = app.test_client.get('/user')
    assert resp.status == 200 and resp.text == ''

    req, reps = app.test_client.get('/logout')
    assert resp.status == 200 and resp.text == ''
    req, resp = app.test_client.get('/user')
    assert resp.status == 200 and resp.text == ''
test_auth.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_login_required(app):
    # the default is 'auth.login', change to 'login' to avoid using blueprint
    app.config.AUTH_LOGIN_ENDPOINT = 'login'
    auth = Auth(app)

    @app.post('/login')
    async def login(request):
        name = request.form.get('name')
        password = request.form.get('password')
        if name == 'demo' and password == '1234':
            auth.login_user(request, User(id=1, name=name))
            return response.text('okay')
        return response.text('failed')

    @app.route('/logout')
    @auth.login_required
    async def logout(request):
        auth.logout_user(request)
        return response.redirect('/user')

    @app.route('/user')
    @auth.login_required(user_keyword='user')
    async def user(request, user):
        return response.text(user.name)

    payload = {'name': 'demo', 'password': '1234'}
    req, resp = app.test_client.get('/user', allow_redirects=False)
    assert resp.status == 302
    assert resp.headers['Location'] == app.url_for('login')

    payload = {'name': 'demo', 'password': '1234'}
    req, resp = app.test_client.post('/login', data=payload)
    assert resp.status == 200 and resp.text == 'okay'
    req, resp = app.test_client.get('/user')
    assert resp.status == 200 and resp.text == 'demo'
__init__.py 文件源码 项目:sanic-auth 作者: pyx 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle_no_auth(self, request):
        """Handle unauthorized user"""
        u = self.login_url or request.app.url_for(self.login_endpoint)
        return response.redirect(u)
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_app_categories_handler(request, conf, current_user):
    toshi_id = request.form.get('toshi_id')
    categories = request.form.get('categories', "")
    if toshi_id is not None:

        tags = [s.strip() for s in categories.split(",")]

        async with conf.db.id.acquire() as con:
            categories = await con.fetch("SELECT * FROM categories WHERE tag = ANY($1)", tags)
            categories = [row['category_id'] for row in categories]
            async with con.transaction():
                await con.execute("DELETE FROM app_categories WHERE toshi_id = $1", toshi_id)
                await con.executemany(
                    "INSERT INTO app_categories VALUES ($1, $2) ON CONFLICT DO NOTHING",
                    [(category_id, toshi_id) for category_id in categories])
        if 'Referer' in request.headers:
            return redirect(request.headers['Referer'])
        return redirect("/{}/user/{}".format(conf.name, toshi_id))
    return redirect("/{}/apps".format(conf.name))
oauth1.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def clientgetter(self, f):
        """Register a function as the client getter.
        The function accepts one parameter `client_key`, and it returns
        a client object with at least these information:
            - client_key: A random string
            - client_secret: A random string
            - redirect_uris: A list of redirect uris
            - default_realms: Default scopes of the client
        The client may contain more information, which is suggested:
            - default_redirect_uri: One of the redirect uris
        Implement the client getter::
            @oauth.clientgetter
            def get_client(client_key):
                client = get_client_model(client_key)
                # Client is an object
                return client
        """
        self._clientgetter = f
        return f
oauth1.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def confirm_authorization_request(self):
        """When consumer confirm the authrozation."""
        server = self.server

        uri, http_method, body, headers = extract_params()
        try:
            realms, credentials = server.get_realms_and_credentials(
                uri, http_method=http_method, body=body, headers=headers
            )
            ret = server.create_authorization_response(
                uri, http_method, body, headers, realms, credentials
            )
            log.debug('Authorization successful.')
            return create_response(*ret)
        except errors.OAuth1Error as e:
            return redirect(e.in_uri(self.error_uri))
        except errors.InvalidClientError as e:
            return redirect(e.in_uri(self.error_uri))
myapp.py 文件源码 项目:sanic-motor 作者: lixxu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def new(request):
    if request.method == 'POST':
        name = request.form.get('name', '').strip().lower()
        age = request.form.get('age', '').strip()
        if name:
            is_uniq = await User.is_unique(doc=dict(name=name))
            if is_uniq in (True, None):
                await User.insert_one(dict(name=name, age=int(age)))
                request['flash']('User was added successfully', 'success')
                return redirect(app.url_for('index'))
            else:
                request['flash']('This name was already taken', 'error')

        request['flash']('User name is required', 'error')

    return jinja.render('form.html', request, user={})
novels_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def chapter(request):
    """
    ?????????
    : content_url   ?????U??url?????
    : url           ??????url
    : novels_name   ????
    :return: ???????
    """
    url = request.args.get('url', None)
    novels_name = request.args.get('novels_name', None)
    netloc = get_netloc(url)
    if netloc not in RULES.keys():
        return redirect(url)
    if netloc in REPLACE_RULES.keys():
        url = url.replace(REPLACE_RULES[netloc]['old'], REPLACE_RULES[netloc]['new'])
    content_url = RULES[netloc].content_url
    content = await cache_owllook_novels_chapter(url=url, netloc=netloc)
    if content:
        content = str(content).strip('[],, Jjs').replace(', ', '').replace('onerror', '').replace('js', '').replace(
            '????', '')
        return template(
            'chapter.html', novels_name=novels_name, url=url, content_url=content_url, soup=content)
    else:
        return text('?????????????????????????????????{url}'.format(url=url))
novels_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def owllook_register(request):
    """
    ????
    :param request:
    :return:
        :   -1  ??????????
        :   0   ????????
        :   1   ????
    """
    user = request['session'].get('user', None)
    if user:
        return redirect('/')
    else:
        ver_que_ans = ver_question()
        if ver_que_ans:
            request['session']['index'] = ver_que_ans
            return template(
                'register.html',
                title='owllook - ?? - ????????',
                question=ver_que_ans[1]
            )
        else:
            return redirect('/')
md_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def noti_book(request):
    user = request['session'].get('user', None)
    if user:
        try:
            motor_db = motor_base.get_db()
            is_author = 0
            data = await motor_db.user_message.find_one({'user': user}, {'author_latest': 1, '_id': 0})
            if data:
                is_author = 1
                author_list = data.get('author_latest', {})
                return template('noti_book.html', title='???? - owllook'.format(user=user),
                                is_login=1,
                                is_author=is_author,
                                author_list=author_list,
                                user=user)
            else:
                return template('noti_book.html', title='???? - owllook'.format(user=user),
                                is_login=1,
                                is_author=is_author,
                                user=user)
        except Exception as e:
            LOGGER.error(e)
            return redirect('/')
    else:
        return redirect('/')
md_blueprint.py 文件源码 项目:owllook 作者: howie6879 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def admin_setting(request):
    user = request['session'].get('user', None)
    if user:
        try:
            motor_db = motor_base.get_db()
            data = await motor_db.user.find_one({'user': user})
            if data:
                return template('admin_setting.html', title='{user}??? - owllook'.format(user=user),
                                is_login=1,
                                user=user,
                                register_time=data['register_time'],
                                email=data.get('email', '???????'))
            else:
                return text('????')
        except Exception as e:
            LOGGER.error(e)
            return redirect('/')
    else:
        return redirect('/')
decorators.py 文件源码 项目:ssr-panel-sanic 作者: gaolycn 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def login_required(f):
    @wraps(f)
    async def decorated_function(*args, **kwargs):
        for arg in args:
            if isinstance(arg, Request):
                uid = arg.get('session', {}).get('uid')
                if uid is None:
                    return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
                try:
                    user = await User.objects.get(User.id == uid)
                    arg['uid'] = uid
                    arg['user'] = user
                except User.DoesNotExist:
                    return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
                break
        return await f(*args, **kwargs)
    return decorated_function
decorators.py 文件源码 项目:ssr-panel-sanic 作者: gaolycn 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def admin_required(f):
    @wraps(f)
    async def decorated_function(*args, **kwargs):
        for arg in args:
            if isinstance(arg, Request):
                uid = arg.get('session', {}).get('uid')
                if uid is None:
                    return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
                try:
                    user = await User.objects.get(User.id == uid, User.is_admin == 1)
                    arg['uid'] = uid
                    arg['user'] = user
                except User.DoesNotExist:
                    return response.redirect(arg.app.url_for('auth.LoginView', next=arg.url))
                break
        return await f(*args, **kwargs)
    return decorated_function
counter.py 文件源码 项目:sanic-cookiesession 作者: pyx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def inc(request):
    request['session']['counter'] += 1
    return response.redirect('/')
counter.py 文件源码 项目:sanic-cookiesession 作者: pyx 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dec(request):
    request['session']['counter'] -= 1
    return response.redirect('/')
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def requires_login(fn):
    async def check_login(request, *args, **kwargs):
        session_cookie = request.cookies.get('session')
        if session_cookie:
            async with app.pool.acquire() as con:
                admin = await con.fetchrow("SELECT admins.toshi_id FROM admins "
                                           "JOIN sessions ON admins.toshi_id = sessions.toshi_id "
                                           "WHERE sessions.session_id = $1",
                                           session_cookie)
            if admin:
                url = '{}/v1/user/{}'.format(ID_SERVICE_LOGIN_URL, admin['toshi_id'])
                resp = await app.http.get(url)
                if resp.status == 200:
                    admin = await resp.json()
                    if admin['custom']['avatar'].startswith('/'):
                        admin['custom']['avatar'] = "{}{}".format(ID_SERVICE_LOGIN_URL, admin['custom']['avatar'])
                else:
                    admin = None
        else:
            admin = None
        if not admin:
            return redirect("/login?redirect={}".format(request.path))
        # keep the config object as the first argument
        if len(args) and isinstance(args[0], Config):
            args = (args[0], admin, *args[1:])
        else:
            args = (admin, *args)
        rval = await fn(request, *args, **kwargs)
        return rval
    return check_login
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def index(request, user):
    return redirect("/mainnet")
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post_logout(request):

    session_cookie = request.cookies.get('session')
    if session_cookie:
        async with app.pool.acquire() as con:
            await con.execute("DELETE FROM sessions "
                              "WHERE sessions.session_id = $1",
                              session_cookie)
        del request.cookies['session']
    return redirect("/login")
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post_admin_add_remove(request, current_user, action):
    if 'toshi_id' in request.form:
        toshi_id = request.form.get('toshi_id')
        if not toshi_id:
            SanicException("Bad Arguments", status_code=400)
    elif 'username' in request.form:
        username = request.form.get('username')
        if not username:
            raise SanicException("Bad Arguments", status_code=400)
        if username[0] == '@':
            username = username[1:]
            if not username:
                raise SanicException("Bad Arguments", status_code=400)
        async with app.configs['mainnet'].db.id.acquire() as con:
            user = await con.fetchrow("SELECT * FROM users WHERE username = $1", username)
            if user is None and username.startswith("0x"):
                user = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", username)
        if user is None:
            raise SanicException("User not found", status_code=400)
        toshi_id = user['toshi_id']
    else:
        SanicException("Bad Arguments", status_code=400)

    if action == 'add':
        print('adding admin: {}'.format(toshi_id))
        async with app.pool.acquire() as con:
            await con.execute("INSERT INTO admins VALUES ($1) ON CONFLICT DO NOTHING", toshi_id)
    elif action == 'remove':
        print('removing admin: {}'.format(toshi_id))
        async with app.pool.acquire() as con:
            await con.execute("DELETE FROM admins WHERE toshi_id = $1", toshi_id)
            await con.execute("DELETE FROM sessions WHERE toshi_id = $1", toshi_id)
    else:
        raise SanicException("Not Found", status_code=404)

    if 'Referer' in request.headers:
        return redirect(request.headers['Referer'])
    return redirect("/config")
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete_dapp(request, conf, current_user, dapp_id):
    async with conf.db.id.acquire() as con:
        await con.execute("DELETE FROM dapps WHERE dapp_id = $1", int(dapp_id))

    if 'Referer' in request.headers:
        return redirect(request.headers['Referer'])
    return redirect("/{}/dapps".format(conf.name))
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def feature_app_handler_post(request, conf, current_user):
    print(request.form)
    toshi_id = request.form.get('toshi_id')
    featured = request.form.get('featured', False)
    if toshi_id is not None:
        async with conf.db.id.acquire() as con:
            await con.execute("UPDATE users SET featured = $2 WHERE toshi_id = $1", toshi_id, True if featured else False)
        if 'Referer' in request.headers:
            return redirect(request.headers['Referer'])
        return redirect("/{}/user/{}".format(conf.name, toshi_id))
    return redirect("/{}/apps".format(conf.name))
app.py 文件源码 项目:toshi-admin-service 作者: toshiapp 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def blocked_app_handler_post(request, conf, current_user):
    toshi_id = request.form.get('toshi_id')
    blocked = request.form.get('blocked', False)
    if toshi_id is not None:
        async with conf.db.id.acquire() as con:
            async with con.transaction():
                await con.execute("UPDATE users SET blocked = $2 WHERE toshi_id = $1", toshi_id, True if blocked else False)
        if 'Referer' in request.headers:
            return redirect(request.headers['Referer'])
        return redirect("/{}/user/{}".format(conf.name, toshi_id))
    return redirect("/{}/apps".format(conf.name))
utils.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def login_redirect(request):
    """Convenience method to return a redirect with "next" populated.
    :param request: Sanic request.
    :return: Redirect response.
    """
    return redirect('{0}?next={1}'.format(settings.AUTH_CONFIG['login_url'], request.path))
oauth1.py 文件源码 项目:Sanic-OAuth 作者: Sniedes722 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def error_uri(self):
        """The error page URI.
        When something turns error, it will redirect to this error page.
        You can configure the error page URI with Flask config::
            OAUTH1_PROVIDER_ERROR_URI = '/error'
        You can also define the error page by a named endpoint::
            OAUTH1_PROVIDER_ERROR_ENDPOINT = 'oauth.error'
        """
        error_uri = self.app.config.get('OAUTH1_PROVIDER_ERROR_URI')
        if error_uri:
            return error_uri
        error_endpoint = self.app.config.get('OAUTH1_PROVIDER_ERROR_ENDPOINT')
        if error_endpoint:
            return url_for(error_endpoint)
        return '/oauth/errors'


问题


面经


文章

微信
公众号

扫码关注公众号