python类_get_current_object()的实例源码

core.py 文件源码 项目:graph-data-experiment 作者: occrp-attic 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_model():
    app = current_app._get_current_object()
    if not hasattr(app, '_lg_model'):
        from memorious.model import Model
        model_config = get_config('MODEL_YAML', 'model.yaml')
        app._lg_model = Model(load_config_file(model_config))
    return app._lg_model
core.py 文件源码 项目:graph-data-experiment 作者: occrp-attic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_es():
    app = current_app._get_current_object()
    if not hasattr(app, '_es_instance'):
        app._es_instance = Elasticsearch(get_config('ELASTICSEARCH_URL'),
                                         timeout=240, sniff_on_start=True)
    return app._es_instance
flask_mail.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(self, message, envelope_from=None):
        """Verifies and sends message.

        :param message: Message instance.
        :param envelope_from: Email address to be used in MAIL FROM command.
        """
        assert message.send_to, "No recipients have been added"

        assert message.sender, (
                "The message does not specify a sender and a default sender "
                "has not been configured")

        if message.has_bad_headers():
            raise BadHeaderError

        if message.date is None:
            message.date = time.time()

        if self.host:
            self.host.sendmail(sanitize_address(envelope_from or message.sender),
                               list(sanitize_addresses(message.send_to)),
                               message.as_bytes() if PY3 else message.as_string(),
                               message.mail_options,
                               message.rcpt_options)

        email_dispatched.send(message, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
email.py 文件源码 项目:api 作者: Yiave 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_email(to, subject, template, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['YIAVE_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['YIAVE_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    thr = Thread(target=send_async_email, args=[app, msg])
    thr.start()
    return thr
flack.py 文件源码 项目:flack 作者: miguelgrinberg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def before_first_request():
    """Start a background thread that looks for users that leave."""
    def find_offline_users(app):
        with app.app_context():
            while True:
                users = User.find_offline_users()
                for user in users:
                    push_model(user)
                db.session.remove()
                time.sleep(5)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=find_offline_users,
                                  args=(current_app._get_current_object(),))
        thread.start()
email_service.py 文件源码 项目:python_ddd_flask 作者: igorvinnicius 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send_email(to, subject, template, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['SYSTEM_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['SYSTEM_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    thr = Thread(target=send_async_email, args=[app, msg])
    thr.start()
    return thr
flask_mail.py 文件源码 项目:python_ddd_flask 作者: igorvinnicius 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(self, message, envelope_from=None):
        """Verifies and sends message.

        :param message: Message instance.
        :param envelope_from: Email address to be used in MAIL FROM command.
        """
        assert message.send_to, "No recipients have been added"

        assert message.sender, (
                "The message does not specify a sender and a default sender "
                "has not been configured")

        if message.has_bad_headers():
            raise BadHeaderError

        if message.date is None:
            message.date = time.time()

        if self.host:
            self.host.sendmail(sanitize_address(envelope_from or message.sender),
                               list(sanitize_addresses(message.send_to)),
                               message.as_bytes() if PY3 else message.as_string(),
                               message.mail_options,
                               message.rcpt_options)

        email_dispatched.send(message, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
test_views.py 文件源码 项目:zual 作者: ninadmhatre 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        FlaskBloggingTestCase.setUp(self)
        self._create_storage()
        self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
        self.engine = self._create_blogging_engine()
        self.login_manager = LoginManager(self.app)

        @self.login_manager.user_loader
        @self.engine.user_loader
        def load_user(user_id):
            return TestUser(user_id)

        @self.app.route("/login/<username>/", methods=["POST"],
                        defaults={"blogger": 0})
        @self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
        def login(username, blogger):
            this_user = TestUser(username)
            login_user(this_user)
            if blogger:
                identity_changed.send(current_app._get_current_object(),
                                      identity=Identity(username))
            return redirect("/")

        @self.app.route("/logout/")
        def logout():
            logout_user()
            identity_changed.send(current_app._get_current_object(),
                                  identity=AnonymousIdentity())
            return redirect("/")

        for i in range(20):
            tags = ["hello"] if i < 10 else ["world"]
            user = "testuser" if i < 10 else "newuser"
            self.storage.save_post(title="Sample Title%d" % i,
                                   text="Sample Text%d" % i,
                                   user_id=user, tags=tags)
test_views.py 文件源码 项目:zual 作者: ninadmhatre 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        FlaskBloggingTestCase.setUp(self)
        self._create_storage()
        self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
        self.engine = self._create_blogging_engine()
        self.login_manager = LoginManager(self.app)

        @self.login_manager.user_loader
        @self.engine.user_loader
        def load_user(user_id):
            return TestUser(user_id)

        @self.app.route("/login/<username>/", methods=["POST"],
                        defaults={"blogger": 0})
        @self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
        def login(username, blogger):
            this_user = TestUser(username)
            login_user(this_user)
            if blogger:
                identity_changed.send(current_app._get_current_object(),
                                      identity=Identity(username))
            return redirect("/")

        @self.app.route("/logout/")
        def logout():
            logout_user()
            identity_changed.send(current_app._get_current_object(),
                                  identity=AnonymousIdentity())
            return redirect("/")

        for i in range(20):
            tags = ["hello"] if i < 10 else ["world"]
            user = "testuser" if i < 10 else "newuser"
            self.storage.save_post(title="Sample Title%d" % i,
                                   text="Sample Text%d" % i,
                                   user_id=user, tags=tags)
test_views.py 文件源码 项目:zual 作者: ninadmhatre 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setUp(self):
        FlaskBloggingTestCase.setUp(self)
        self._create_storage()
        self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
        self.engine = self._create_blogging_engine()
        self.login_manager = LoginManager(self.app)

        @self.login_manager.user_loader
        @self.engine.user_loader
        def load_user(user_id):
            return TestUser(user_id)

        @self.app.route("/login/<username>/", methods=["POST"],
                        defaults={"blogger": 0})
        @self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
        def login(username, blogger):
            this_user = TestUser(username)
            login_user(this_user)
            if blogger:
                identity_changed.send(current_app._get_current_object(),
                                      identity=Identity(username))
            return redirect("/")

        @self.app.route("/logout/")
        def logout():
            logout_user()
            identity_changed.send(current_app._get_current_object(),
                                  identity=AnonymousIdentity())
            return redirect("/")

        for i in range(20):
            tags = ["hello"] if i < 10 else ["world"]
            user = "testuser" if i < 10 else "newuser"
            self.storage.save_post(title="Sample Title%d" % i,
                                   text="Sample Text%d" % i,
                                   user_id=user, tags=tags)
blog_cache.py 文件源码 项目:zual 作者: ninadmhatre 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def login():
    user = User("testuser")
    login_user(user)

    # notify the change of role
    identity_changed.send(current_app._get_current_object(),
                          identity=Identity("testuser"))
    return redirect("/blog")
blog_cache.py 文件源码 项目:zual 作者: ninadmhatre 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def logout():
    logout_user()

    # notify the change of role
    identity_changed.send(current_app._get_current_object(),
                          identity=AnonymousIdentity())
    return redirect("/")
blog_roles.py 文件源码 项目:zual 作者: ninadmhatre 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def logout():
    logout_user()

    # notify the change of role
    identity_changed.send(current_app._get_current_object(),
                          identity=AnonymousIdentity())
    return redirect("/")
auth.py 文件源码 项目:knowledge-repo 作者: airbnb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def logout():
    logout_user()

    # Notify flask principal that the user has logged out
    identity_changed.send(current_app._get_current_object(),
                          identity=AnonymousIdentity())

    return redirect(url_for('index.render_feed'))
auth_provider.py 文件源码 项目:knowledge-repo 作者: airbnb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _perform_login(self, user):
        user = prepare_user(user)
        login_user(user)

        # Notify flask principal that the identity has changed
        identity_changed.send(current_app._get_current_object(),
                              identity=Identity(user.id))
email.py 文件源码 项目:database_project 作者: HughWen 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_email(to, subject, template, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    thr = Thread(target=send_async_email, args=[app, msg])
    thr.start()
    return thr
email.py 文件源码 项目:copyflask_web 作者: superchilli 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def send_email(to, subject, template, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender = app.config['FLASKY_MAIL_SENDER'], recipients=[to])

    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    thr = Thread(target=send_async_email, args=[app, msg])
    thr.start()
    return thr
email.py 文件源码 项目:cci-demo-flask 作者: circleci 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def send_email(to, subject, template, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)
    thr = Thread(target=send_async_email, args=[app, msg])
    thr.start()
    return thr
__init__.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def login():
    data = request.values
    if hasattr(request, "json") and request.json:
        data = request.json

    if octoprint.server.userManager.enabled and "user" in data and "pass" in data:
        username = data["user"]
        password = data["pass"]

        if "remember" in data and data["remember"] in valid_boolean_trues:
            remember = True
        else:
            remember = False

        if "usersession.id" in session:
            _logout(current_user)

        user = octoprint.server.userManager.findUser(username)
        if user is not None:
            if octoprint.server.userManager.checkPassword(username, password):
                if octoprint.server.userManager.enabled:
                    user = octoprint.server.userManager.login_user(user)
                    session["usersession.id"] = user.get_session()
                    g.user = user
                login_user(user, remember=remember)
                identity_changed.send(current_app._get_current_object(), identity=Identity(user.get_id()))
                return jsonify(user.asDict())
        return make_response(("User unknown or password incorrect", 401, []))

    elif "passive" in data:
        return passive_login()
    return NO_CONTENT


问题


面经


文章

微信
公众号

扫码关注公众号