def get_model():
app = current_app._get_current_object()
if not hasattr(app, '_lg_model'):
from memorious.model import Model
model_config = get_config('MODEL_YAML', 'model.yaml')
app._lg_model = Model(load_config_file(model_config))
return app._lg_model
python类_get_current_object()的实例源码
def get_es():
app = current_app._get_current_object()
if not hasattr(app, '_es_instance'):
app._es_instance = Elasticsearch(get_config('ELASTICSEARCH_URL'),
timeout=240, sniff_on_start=True)
return app._es_instance
def send(self, message, envelope_from=None):
"""Verifies and sends message.
:param message: Message instance.
:param envelope_from: Email address to be used in MAIL FROM command.
"""
assert message.send_to, "No recipients have been added"
assert message.sender, (
"The message does not specify a sender and a default sender "
"has not been configured")
if message.has_bad_headers():
raise BadHeaderError
if message.date is None:
message.date = time.time()
if self.host:
self.host.sendmail(sanitize_address(envelope_from or message.sender),
list(sanitize_addresses(message.send_to)),
message.as_bytes() if PY3 else message.as_string(),
message.mail_options,
message.rcpt_options)
email_dispatched.send(message, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(app.config['YIAVE_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
sender=app.config['YIAVE_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
def before_first_request():
"""Start a background thread that looks for users that leave."""
def find_offline_users(app):
with app.app_context():
while True:
users = User.find_offline_users()
for user in users:
push_model(user)
db.session.remove()
time.sleep(5)
if not current_app.config['TESTING']:
thread = threading.Thread(target=find_offline_users,
args=(current_app._get_current_object(),))
thread.start()
def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(app.config['SYSTEM_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
sender=app.config['SYSTEM_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
def send(self, message, envelope_from=None):
"""Verifies and sends message.
:param message: Message instance.
:param envelope_from: Email address to be used in MAIL FROM command.
"""
assert message.send_to, "No recipients have been added"
assert message.sender, (
"The message does not specify a sender and a default sender "
"has not been configured")
if message.has_bad_headers():
raise BadHeaderError
if message.date is None:
message.date = time.time()
if self.host:
self.host.sendmail(sanitize_address(envelope_from or message.sender),
list(sanitize_addresses(message.send_to)),
message.as_bytes() if PY3 else message.as_string(),
message.mail_options,
message.rcpt_options)
email_dispatched.send(message, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def setUp(self):
FlaskBloggingTestCase.setUp(self)
self._create_storage()
self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
self.engine = self._create_blogging_engine()
self.login_manager = LoginManager(self.app)
@self.login_manager.user_loader
@self.engine.user_loader
def load_user(user_id):
return TestUser(user_id)
@self.app.route("/login/<username>/", methods=["POST"],
defaults={"blogger": 0})
@self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
def login(username, blogger):
this_user = TestUser(username)
login_user(this_user)
if blogger:
identity_changed.send(current_app._get_current_object(),
identity=Identity(username))
return redirect("/")
@self.app.route("/logout/")
def logout():
logout_user()
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect("/")
for i in range(20):
tags = ["hello"] if i < 10 else ["world"]
user = "testuser" if i < 10 else "newuser"
self.storage.save_post(title="Sample Title%d" % i,
text="Sample Text%d" % i,
user_id=user, tags=tags)
def setUp(self):
FlaskBloggingTestCase.setUp(self)
self._create_storage()
self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
self.engine = self._create_blogging_engine()
self.login_manager = LoginManager(self.app)
@self.login_manager.user_loader
@self.engine.user_loader
def load_user(user_id):
return TestUser(user_id)
@self.app.route("/login/<username>/", methods=["POST"],
defaults={"blogger": 0})
@self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
def login(username, blogger):
this_user = TestUser(username)
login_user(this_user)
if blogger:
identity_changed.send(current_app._get_current_object(),
identity=Identity(username))
return redirect("/")
@self.app.route("/logout/")
def logout():
logout_user()
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect("/")
for i in range(20):
tags = ["hello"] if i < 10 else ["world"]
user = "testuser" if i < 10 else "newuser"
self.storage.save_post(title="Sample Title%d" % i,
text="Sample Text%d" % i,
user_id=user, tags=tags)
def setUp(self):
FlaskBloggingTestCase.setUp(self)
self._create_storage()
self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
self.engine = self._create_blogging_engine()
self.login_manager = LoginManager(self.app)
@self.login_manager.user_loader
@self.engine.user_loader
def load_user(user_id):
return TestUser(user_id)
@self.app.route("/login/<username>/", methods=["POST"],
defaults={"blogger": 0})
@self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
def login(username, blogger):
this_user = TestUser(username)
login_user(this_user)
if blogger:
identity_changed.send(current_app._get_current_object(),
identity=Identity(username))
return redirect("/")
@self.app.route("/logout/")
def logout():
logout_user()
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect("/")
for i in range(20):
tags = ["hello"] if i < 10 else ["world"]
user = "testuser" if i < 10 else "newuser"
self.storage.save_post(title="Sample Title%d" % i,
text="Sample Text%d" % i,
user_id=user, tags=tags)
def login():
user = User("testuser")
login_user(user)
# notify the change of role
identity_changed.send(current_app._get_current_object(),
identity=Identity("testuser"))
return redirect("/blog")
def logout():
logout_user()
# notify the change of role
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect("/")
def logout():
logout_user()
# notify the change of role
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect("/")
def logout():
logout_user()
# Notify flask principal that the user has logged out
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect(url_for('index.render_feed'))
def _perform_login(self, user):
user = prepare_user(user)
login_user(user)
# Notify flask principal that the identity has changed
identity_changed.send(current_app._get_current_object(),
identity=Identity(user.id))
def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
sender = app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message(app.config['MAIL_SUBJECT_PREFIX'] + ' ' + subject,
sender=app.config['MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
def login():
data = request.values
if hasattr(request, "json") and request.json:
data = request.json
if octoprint.server.userManager.enabled and "user" in data and "pass" in data:
username = data["user"]
password = data["pass"]
if "remember" in data and data["remember"] in valid_boolean_trues:
remember = True
else:
remember = False
if "usersession.id" in session:
_logout(current_user)
user = octoprint.server.userManager.findUser(username)
if user is not None:
if octoprint.server.userManager.checkPassword(username, password):
if octoprint.server.userManager.enabled:
user = octoprint.server.userManager.login_user(user)
session["usersession.id"] = user.get_session()
g.user = user
login_user(user, remember=remember)
identity_changed.send(current_app._get_current_object(), identity=Identity(user.get_id()))
return jsonify(user.asDict())
return make_response(("User unknown or password incorrect", 401, []))
elif "passive" in data:
return passive_login()
return NO_CONTENT