python类config()的实例源码

create_app.py 文件源码 项目:notebook-api 作者: jefersondaniel 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_app(extra_config_settings={}):
    """
    Initialize Flask applicaton
    """

    app.config.from_object('app.startup.settings')
    app.config.update(extra_config_settings)

    # Load all blueprints with their manager commands, entities and views
    from app import core

    mongoengine.connect(
        app.config['DATABASE_NAME'],
        host=app.config['DATABASE_HOST'],
        port=app.config['DATABASE_PORT'],
        username=app.config['DATABASE_USER'],
        password=app.config['DATABASE_PASSWORD']
    )

    return app
models.py 文件源码 项目:bucket_api 作者: jokamjohn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def encode_auth_token(self, user_id):
        """
        Encode the Auth token
        :param user_id: User's Id
        :return:
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=app.config.get('AUTH_TOKEN_EXPIRY_DAYS'),
                                                                       seconds=app.config.get(
                                                                           'AUTH_TOKEN_EXPIRY_SECONDS')),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                app.config['SECRET_KEY'],
                algorithm='HS256'
            )
        except Exception as e:
            return e
OneSignalUtil.py 文件源码 项目:restful-api 作者: TeamGhostBuster 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_invitation_notification(self, invitations):
        target_users = list()
        for each in invitations:
            target_users.append({'field': 'tag', 'key': 'email', 'relation': '=', 'value': each.invitee.email})
            target_users.append({'operator': 'OR'})

        payload = {
            'app_id': app.config['ONE_SIGNAL_SETTINGS']['API_ID'],
            'filters': target_users,
            'contents': {'en': '{} sent you a invitation'.format(invitations[0].inviter.first_name)}
        }

        req = requests.post('https://onesignal.com/api/v1/notifications',
                            headers=self.header,
                            data=json.dumps(payload))

        if req.status_code == 200:
            app.logger.info('Push notification sent')
tasks.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unique(fun):
    global r
    if not r:
        r = redis.StrictRedis.from_url(flaskapp.config['REDIS_URI'])

    @wraps(fun)
    def wrapper(*args, **kwargs):
        key = 'celery_unique_lock:{}'.format(pickle.dumps((fun.__name__, args, kwargs)))
        has_lock = False
        try:
            if r.set(key, 1, nx=True, ex=60*5):
                has_lock = True
                return fun(*args, **kwargs)
        finally:
            if has_lock:
                r.delete(key)

    return wrapper
run.py 文件源码 项目:Hermes 作者: nmpiazza 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_and_run(args=None):
    sslBaseDir = path.join(BASE_DIR, 'ssl')

    p = ArgumentParser()
    p.add_argument('--bind', '-b', action='store', help='the address to bind to', default='127.0.0.1')
    p.add_argument('--port', '-p', action='store', type=int, help='the port to listen on', default=8080)
    p.add_argument('--debug', '-d', action='store_true', help='enable debugging (use with caution)', default=False)
    p.add_argument('--ssl', '-s', action='store_true', help='enable ssl', default=False)

    args = p.parse_args(args)
    if args.ssl:
        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        ctx.load_cert_chain(path.join(sslBaseDir, 'server.crt'), path.join(sslBaseDir, 'server.key'))
        app.config['SESSION_TYPE'] = 'filesystem'

        app.run(host=args.bind, port=args.port, debug=args.debug, ssl_context=ctx)
    else:
        app.run(host=args.bind, port=args.port, debug=args.debug)
models.py 文件源码 项目:bucket_api 作者: jokamjohn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decode_auth_token(token):
        """
        Decoding the token to get the payload and then return the user Id in 'sub'
        :param token: Auth Token
        :return:
        """
        try:
            payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
            is_token_blacklisted = BlackListToken.check_blacklist(token)
            if is_token_blacklisted:
                return 'Token was Blacklisted, Please login In'
            return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired, Please sign in again'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please sign in again'
Log.py 文件源码 项目:salicapi 作者: Lafaiet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def log( cls, level, message, caller = None ):
        if not cls.logger:
            cls.instantiate( logLevel = app.config['LEVELOFLOG'] )

        try:
            if level not in logging._levelNames:
                cls.log( "ERROR", 'Invalid file level \'%s\''%( level ) )

            logLevel = logging._levelNames[level]
            if not caller:
                callers = Log.getCallers( inspect.stack() )
            else:
                callers = caller
            message = '%s.%s - %s'%( callers[0], callers[1] , message )

            cls.logger.log( logLevel, message )
        except Exception, e:
            print 'Unable to record the log. Error: %s'%( e )
connections.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_mail(recipients, subject, body, html=None, sender=None):
    from app import app

    msg = {
        'to': check_email(recipients),
        'subj': subject,
        'body': body,
    }
    if sender:
        msg['from'] = sender
    if html:
        msg['html'] = html

    redis = StrictRedis(
        app.config['SMTP_BROKER_HOST'],
        app.config['SMTP_BROKER_PORT'],
        app.config['SMTP_BROKER_DB']
    )
    redis.publish(app.config['SMTP_BROKER_CHANNEL'], json.dumps(msg))
connections.py 文件源码 项目:dark-chess 作者: AHAPX 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def send_ws(message, signal=consts.WS_NONE, tags=[]):
    from app import app

    _tags = tags if isinstance(tags, list) else [tags]
    msg = {
        'message': {
            'message': message,
            'signal': signal,
            'tags': _tags,
        },
        'tags': _tags,
    }

    redis = StrictRedis(
        app.config['WS_BROKER_HOST'],
        app.config['WS_BROKER_PORT'],
        app.config['WS_BROKER_DB']
    )
    redis.publish(app.config['WS_BROKER_CHANNEL'], json.dumps(msg))
views.py 文件源码 项目:nearby-API 作者: NearbyApp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def linkFacebook():
    """Links a Facebook account to an existing Google account.
    """
    form = LinkFacebookForm()
    # Check if the Service-Provider is Google
    if form.validate_on_submit() and g.loginWith == 'Google' and g.currentUser['facebookId'] is None:
        facebookToken = FacebookModel.getTokenValidation(app.config['FACEBOOK_ACCESS_TOKEN'], form.token.data)
        if facebookToken['is_valid'] and facebookToken['user_id'] == form.facebookId.data:
            # Continue only if the account doesn't exist yet.
            if not FacebookModel.doesUserExist(form.facebookId.data):
                if FacebookModel.linkToUserId(g.currentUser['_id'], form.facebookId.data):
                    return json.dumps({'result':'OK'}), 200
            else:
                return abort(403)
        else:
            return abort(401)

    return abort(400)
views.py 文件源码 项目:nearby-API 作者: NearbyApp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def linkGoogle():
    """Links a Google account to an existing Facebook account.
    """
    form = LinkGoogleForm()
    # Check if the Service-Provider is Facebook
    if form.validate_on_submit() and g.loginWith == 'Facebook' and g.currentUser['googleId'] is None:
        googleToken = GoogleModel.getTokenValidation(app.config['GOOGLE_CLIENT_ID'], form.token.data)
        if googleToken and googleToken['sub'] == form.googleId.data:
            # Continue only if the account doesn't exist yet.
            if not GoogleModel.doesUserExist(form.googleId.data):
                if GoogleModel.linkToUserId(g.currentUser['_id'], form.googleId.data):
                    return json.dumps({'result':'OK'}), 200
            else:
                return abort(403)
        else:
            return abort(401)

    return abort(400)
views.py 文件源码 项目:nearby-API 作者: NearbyApp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def mergeFacebook():
    """Merges an existing Facebook account to an existing Google account.
    """
    form = MergeFacebookForm()
    # Check if the Service-Provider is Google
    if form.validate_on_submit() and g.loginWith == 'Google' and g.currentUser['facebookId'] is None:
        facebookToken = FacebookModel.getTokenValidation(app.config['FACEBOOK_ACCESS_TOKEN'], form.token.data)
        if facebookToken['is_valid'] and facebookToken['user_id'] == form.facebookId.data:
            # Continue only if the account does exist.
            if FacebookModel.doesUserExist(form.facebookId.data):
                facebookUser = FacebookModel.getUser(form.facebookId.data)
                if UserModel.mergeUsers(g.currentUser['_id'], facebookUser['_id']):
                    return json.dumps({'result':'OK'}), 200
            else:
                return abort(404)
        else:
            return abort(401)

    return abort(400)
views.py 文件源码 项目:nearby-API 作者: NearbyApp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def mergeGoogle():
    """Merges an existing Google account to an existing Facebook account.
    """
    form = MergeGoogleForm()
    # Check if the Service-Provider is Facebook
    if form.validate_on_submit() and g.loginWith == 'Facebook' and g.currentUser['googleId'] is None:
        googleToken = GoogleModel.getTokenValidation(app.config['GOOGLE_CLIENT_ID'], form.token.data)
        if googleToken and googleToken['sub'] == form.googleId.data:
            # Continue only if the account does exist.
            if GoogleModel.doesUserExist(form.googleId.data):
                googleUser = GoogleModel.getUser(form.googleId.data)
                if UserModel.mergeUsers(g.currentUser['_id'], googleUser['_id']):
                    return json.dumps({'result':'OK'}), 200
            else:
                return abort(404)
        else:
            return abort(401)

    return abort(400)
upload.py 文件源码 项目:website 作者: lasa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate(self):
        if not Form.validate(self):
            return False
        if self.uploadfile.name not in request.files:
            self.uploadfile.errors.append("Please select a valid file.")
            return False

        self.file_ = request.files[self.uploadfile.name]
        self.filename = secure_filename(self.file_.filename)
        uploads = os.listdir(os.path.join(app.root_path, app.config['UPLOAD_FOLDER']))

        if self.filename in uploads:
            self.uploadfile.errors.append("A file with this name already exists.")
            return False
        if not self.filename:
            self.uploadfile.errors.append("Invalid file name.")
            return False
        return True
app_test.py 文件源码 项目:coder_api 作者: seekheart 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUp(self):
        """Setup method for spinning up a test instance of app"""
        app.config['TESTING'] = True
        app.config['WTF_CSRF_ENABLED'] = False
        self.app = app.test_client()
        self.app.testing = True
        self.authorization = {
            'Authorization': "Basic {user}".format(
                user=base64.b64encode(b"test:asdf").decode("ascii")
            )
        }
        self.content_type = 'application/json'
        self.dummy_name = 'dummy'
        self.dummy_user = json.dumps(
            {'username': 'dummy', 'languages': ['testLang']}
        )
        self.dummy_lang = json.dumps(
            {'name': 'dummy', 'users': ['No one']}
        )
game_info.py 文件源码 项目:shrec 作者: hamishivi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_game_description(game_name, steam_description):
    '''
    gets description of game from IGDB, otherwise uses steam description if
    possible.
    '''
    url = f'{app.config["IGDB_API_URL"]}/games/?fields=*&limit=1&search={game_name}'
    headers = {
            'Accept': 'application/json',
            'user-key': app.config['IGDB_API_KEY']
            }
    res = cache.get(url, headers=headers).json()[0]
    # check if we actually found something
    if 'summary' in res.keys():
        return shorten_description(res['summary'])
    # otherwise use steam description
    elif len(steam_description) > 0:
        return shorten_description(steam_description)
    # if no steam description, just tell the user
    else:
        return "Sorry, we couldn't find a description for this game ??"
views.py 文件源码 项目:LTHCourseStatistics 作者: TabTabTab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def course_summary():
    if request.method == 'POST' and 'course_results' in request.files:
        pdf_file = request.files['course_results']
        if pdf_file and allowed_file(pdf_file.filename):
            original_filename = secure_filename(pdf_file.filename)
            unique_filename = create_unique_filename(app.config['UPLOAD_FOLDER'],
                                                     original_filename)
            pdf_file.save(os.path.join(app.config['UPLOAD_FOLDER'], unique_filename))
            pdf_abs_path = os.path.abspath(os.path.join(app.config['UPLOAD_FOLDER'],
                                                        unique_filename))
            student_course_summary = None
            try:
                student_course_summary = course_statistics.get_course_statistics(
                    pdf_abs_path)
            except course_statistics.ReadPDFException as e:
                return render_template('failure.html',
                                       title='Failure',
                                       redirect=True,
                                       message='It seems the file you provided cound not be read as a PDF.')

            return render_template('student_summary_%s.html' % student_course_summary.language,
                                   title='Summary',
                                   student_summary=student_course_summary)

    return redirect(url_for('index'))
conftest.py 文件源码 项目:arch-security-tracker 作者: archlinux 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def app(request):
    flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
    flask_app.config['TESTING'] = True
    flask_app.config['WTF_CSRF_ENABLED'] = False
    flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    flask_app.config['SERVER_NAME'] = 'localhost'
    with flask_app.app_context():
        yield flask_app
views.py 文件源码 项目:MLinProduction 作者: OrkoHunter 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def login():
    form = LoginForm()
    if form.validate_on_submit():
        flash('Login requested for OpenID="%s", remember_me=%s, %s, %s' %
              (form.openid.data, str(form.remember_me.data), form.input_data.data, form.first_layer.data))
        load_model()
        return redirect('/index')
    return render_template('login.html',
                           title='Sign In',
                           form=form,
                           providers=app.config['OPENID_PROVIDERS'])
run_app.py 文件源码 项目:app-skeleton 作者: rragundez 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_gunicorn_app(host, port, debug, **settings):
    """Serve Flask application using Gunicorn.

    The Flask application and respective resources and endpoints should
    defined in `app.py` in this same directory.
    """

    logging.basicConfig(level='DEBUG' if debug else 'INFO')

    # Set a global flag that indicates that we were invoked from the
    # command line interface provided server command.  This is detected
    # by Flask.run to make the call into a no-op.  This is necessary to
    # avoid ugly errors when the script that is loaded here also attempts
    # to start a server.
    os.environ['FLASK_RUN_FROM_CLI_SERVER'] = '1'

    settings['bind'] = '{}:{}'.format(host, port)

    if debug:
        app.jinja_env.auto_reload = True
        app.config['TEMPLATES_AUTO_RELOAD'] = True
        settings.update({'loglevel': 'debug',
                         'reload': True,
                         'threads': 1,
                         'workers': 1,
                         'worker_class': 'sync'})
        app.wsgi_app = DebuggedApplication(app.wsgi_app, True)
        logging.info(" * Launching in Debug mode.")
        logging.info(" * Serving application using a single worker.")
    else:
        logging.info(" * Launching in Production Mode.")
        logging.info(" * Serving application with {} worker(s)."
                     .format(settings["workers"]))

    server = GunicornApp(app, settings=settings)
    server.run()
RequestUtil.py 文件源码 项目:restful-api 作者: TeamGhostBuster 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_auth_info():
    """
    Get user's identity

    Verify the interity of token from the OAuth provider,
    then look up the database check if user exist or not.
    If the user does not exist, create a new user instead.

    :return: user instance
    """
    if 'Access-Token' not in request.headers:
        return None

    access_token = request.headers['Access-Token']
    if access_token is not None:
        # For development purpose only
        if access_token in app.config['TEST_TOKEN'].keys():
            user = MongoUtil.find_user(app.config['TEST_TOKEN'][access_token])
        else:
            # Check that the Access Token is valid.
            url = ('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=%s'
                   % access_token)
            result = requests.get(url).json()
            if result.get('error_description') is not None:
                app.logger.debug('User {} failed to access the server'.format(access_token))
                return None
            user = MongoUtil.find_user(result['email'])

    if user is None:
        # if user does not exist, create a new user instead
        app.logger.info('Create User: {}'.format(user))
        first_name, last_name = get_user_profile(access_token)
        user = MongoUtil.create_user(result['email'], first_name, last_name)

    return user
OneSignalUtil.py 文件源码 项目:restful-api 作者: TeamGhostBuster 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.header = {
            "Content-Type": "application/json; charset=utf-8",
            "Authorization": "Basic {}".format(app.config['ONE_SIGNAL_SETTINGS']['API_KEY'])
        }
twitter.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def get_twitter_for_acc(account):
    consumer_key = app.config['TWITTER_CONSUMER_KEY']
    consumer_secret = app.config['TWITTER_CONSUMER_SECRET']

    tokens = (OAuthToken.query.with_parent(account)
              .order_by(db.desc(OAuthToken.created_at)).all())
    for token in tokens:
        t = Twitter(
                auth=OAuth(token.token, token.token_secret,
                           consumer_key, consumer_secret))
        try:
            t.account.verify_credentials()
            return t
        except TwitterHTTPError as e:
            if e.e.code == 401:
                # token revoked

                if sentry:
                    sentry.captureMessage(
                            'Twitter auth revoked', extra=locals())
                db.session.delete(token)
                db.session.commit()
            else:
                raise TemporaryError(e)
        except URLError as e:
            raise TemporaryError(e)

    return None
version.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def url_for_version(ver):
    match = version_re.match(ver)
    if not match:
        return app.config['REPO_URL']
    return app.config['COMMIT_URL'].format(**match.groupdict())
views.py 文件源码 项目:tellMeYourStory 作者: mouseProgrammouse 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_post():
    form = CreatePost()
    if request.method == 'POST' and form.validate_on_submit():
        """ Get image info """
        if (form.image.data):
            image = form.image.data
            """ Create new file name """
            old_filename, extension = os.path.splitext(image.filename)
            filename = str(int(calendar.timegm(time.gmtime()))) + extension
            """ Check directory """
            directory = os.path.join(app.config['UPLOAD_FOLDER'], str(g.user.get_id()))
            if not os.path.exists(directory):
                os.makedirs(directory)
            """ Save image """
            image.save(os.path.join(directory, filename))
        else:
            filename = None
        """ Add post to DB """
        new_post = Posts(user_id=g.user.get_id(),
            title=form.title.data,
            text=form.text.data,
            pub_date=form.date.data,
            img=filename,
            public=form.public.data)
        db.session.add(new_post)
        db.session.commit()
        """ Success message """
        flash('Done')
        return (redirect(url_for("index")))
    return render_template('addpost.html',
        title='Create new post',
        form=form)
views.py 文件源码 项目:tellMeYourStory 作者: mouseProgrammouse 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def edit_post():
    form = CreatePost()
    if request.method == 'POST':
        post = Posts.query.filter_by(id=request.args['id']).first()
        """ UPDATE DATA """
        post.title = form.title.data
        post.text = form.text.data
        post.pub_date = form.date.data
        """ Get image info """
        if (form.image.data):
            image = form.image.data
            """ Create new file name """
            old_filename, extension = os.path.splitext(image.filename)
            filename = str(int(calendar.timegm(time.gmtime()))) + extension
            """ Check directory """
            directory = os.path.join(app.config['UPLOAD_FOLDER'], str(g.user.get_id()))
            if not os.path.exists(directory):
                os.makedirs(directory)
            """ Save image """
            image.save(os.path.join(directory, filename))
            """ add new imagename """
            post.img = filename
        post.public = form.public.data
        """ UPDATE DATA IN DB """
        db.session.commit()
        """ Succes message """
        flash('Done')
        return (redirect(url_for("index")))
    elif request.method == 'GET':
        """ Get info for post """
        post = Posts.query.filter_by(id=request.args['id']).first()
        if post is not None and post.user_id == g.user.get_id():
            return render_template('edit.html',
                    post=post,
                    title='Update post',
                    form=form)
    flash('Something is wrong')
    return redirect(url_for("index"))
mongodb.py 文件源码 项目:BlogSpider 作者: hack4code 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _connect(self):
        client = MongoClient(app.config['MONGODB_URI'],
                             connect=False)
        db = client[self._name]
        db.authenticate(app.config['MONGODB_USER'],
                        app.config['MONGODB_PWD'])
        self._db = db
test.py 文件源码 项目:BlogSpider 作者: hack4code 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        app.config['TESTING'] = True
        self.app = app.test_client()
        self.app_context = app.app_context()
        self.app_context.push()
helper.py 文件源码 项目:bucket_api 作者: jokamjohn 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_paginated_items(bucket, bucket_id, page, q):
    """
    Get the items from the bucket and then paginate the results.
    Items can also be search when the query parameter is set.
    Construct the previous and next urls.
    :param q: Query parameter
    :param bucket: Bucket
    :param bucket_id: Bucket Id
    :param page: Page number
    :return:
    """

    if q:
        pagination = BucketItem.query.filter(BucketItem.name.like("%" + q.lower().strip() + "%")) \
            .order_by(BucketItem.create_at.desc()) \
            .filter_by(bucket_id=bucket_id) \
            .paginate(page=page, per_page=app.config['BUCKET_AND_ITEMS_PER_PAGE'], error_out=False)
    else:
        pagination = bucket.items.order_by(BucketItem.create_at.desc()).paginate(page=page, per_page=app.config[
            'BUCKET_AND_ITEMS_PER_PAGE'], error_out=False)

    previous = None
    if pagination.has_prev:
        if q:
            previous = url_for('items.get_items', q=q, bucket_id=bucket_id, page=page - 1, _external=True)
        else:
            previous = url_for('items.get_items', bucket_id=bucket_id, page=page - 1, _external=True)
    nex = None
    if pagination.has_next:
        if q:
            nex = url_for('items.get_items', q=q, bucket_id=bucket_id, page=page + 1, _external=True)
        else:
            nex = url_for('items.get_items', bucket_id=bucket_id, page=page + 1, _external=True)
    return pagination.items, nex, pagination, previous
helper.py 文件源码 项目:bucket_api 作者: jokamjohn 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def paginate_buckets(user_id, page, q, user):
    """
    Get a user by Id, then get hold of their buckets and also paginate the results.
    There is also an option to search for a bucket name if the query param is set.
    Generate previous and next pagination urls
    :param q: Query parameter
    :param user_id: User Id
    :param user: Current User
    :param page: Page number
    :return: Pagination next url, previous url and the user buckets.
    """
    if q:
        pagination = Bucket.query.filter(Bucket.name.like("%" + q.lower().strip() + "%")).filter_by(user_id=user_id) \
            .paginate(page=page, per_page=app.config['BUCKET_AND_ITEMS_PER_PAGE'], error_out=False)
    else:
        pagination = user.buckets.paginate(page=page, per_page=app.config['BUCKET_AND_ITEMS_PER_PAGE'],
                                           error_out=False)
    previous = None
    if pagination.has_prev:
        if q:
            previous = url_for('bucket.bucketlist', q=q, page=page - 1, _external=True)
        else:
            previous = url_for('bucket.bucketlist', page=page - 1, _external=True)
    nex = None
    if pagination.has_next:
        if q:
            nex = url_for('bucket.bucketlist', q=q, page=page + 1, _external=True)
        else:
            nex = url_for('bucket.bucketlist', page=page + 1, _external=True)
    items = pagination.items
    return items, nex, pagination, previous


问题


面经


文章

微信
公众号

扫码关注公众号