def login_user(self, user):
self._cleanup_sessions()
if user is None:
return
if isinstance(user, LocalProxy):
user = user._get_current_object()
if not isinstance(user, User):
return None
if not isinstance(user, SessionUser):
user = SessionUser(user)
self._session_users_by_session[user.get_session()] = user
userid = user.get_id()
if not userid in self._session_users_by_userid:
self._session_users_by_userid[userid] = []
self._session_users_by_userid[userid].append(user)
self._logger.debug("Logged in user: %r" % user)
return user
python类LocalProxy()的实例源码
def logout_user(self, user):
if user is None:
return
if isinstance(user, LocalProxy):
user = user._get_current_object()
if not isinstance(user, SessionUser):
return
userid = user.get_id()
if userid in self._session_users_by_userid:
users_by_userid = self._session_users_by_userid[userid]
for u in users_by_userid:
if u.get_session() == user.get_session():
users_by_userid.remove(u)
break
if user.get_session() in self._session_users_by_session:
del self._session_users_by_session[user.get_session()]
self._logger.debug("Logged out user: %r" % user)
def allowed_assignment(self, request: LocalProxy, form: Form) -> bool:
"""Return if assignment is allowed, per settings.
:param request: The request context object.
:param form: form to check
"""
lst = self.setting('assignments').value
assignment = request.form['assignment']
category = request.form.get('category', None)
if ':' in lst:
datum = dict(l.split(':') for l in lst.splitlines())
lst = datum.get(category, '*')
if lst == '*':
return True
if assignment not in str2lst(lst):
prefix = 'Assignment'
if category:
prefix = 'For "%s" inquiries, assignment' % category
form.errors.setdefault('assignment', []) \
.append(
'%s "%s" is not allowed. Only the following '
'assignments are: %s' % (prefix, assignment, lst))
return False
return True
def test_proxies(self):
"""
Test proxies
:return: void
"""
rule = '/' + self.rand_str(20)
# Outside context
with self.assert_raises_regexp(RuntimeError, 'Working outside of request context'):
self.assert_is_none(visitor._get_current_object())
# Call route
with self.app.test_request_context(rule):
# Before pre-processing
with self.assert_raises_regexp(RuntimeError, 'Request context has not been pre-processed'):
self.assert_is_none(visitor._get_current_object())
# Pre-processing
self.app.preprocess_request()
# After pre-processing
self.assert_is_instance(visitor, LocalProxy)
self.assert_is_not_none(visitor._get_current_object())
self.assert_is_instance(visitor._get_current_object(), Visitor)
def smooch_events():
"""
the firehose of events that the webhooks subscribed to,
this will send all messages from people who visit your website
to this endmpoint, the robot then parses those messages and
does a postback of its response.
"""
# this is actually everything people say to the robot, may be good
# to send this to a log file or database
# print json.dumps(request.json)
# get the singletons
smooch_api = LocalProxy(get_smooch_api)
robot = LocalProxy(get_robot)
for message in request.json['messages']:
response = robot.query(message['text'])
if 'status' in response and response['status']['errorType'] == 'success' and 'result' in response and 'fulfillment' in response['result'] and 'speech' in response['result']['fulfillment']:
smooch_api.postback_message(response['result']['fulfillment']['speech'], message['authorId'])
data = {'message':'succeed'}
resp = Response(json.dumps(data), status=200, mimetype='application/json')
return resp
def __getattr__(self, name):
def get_proxy():
return getattr(current_app, name)
return LocalProxy(get_proxy)
def test_local_proxy(self):
foo = []
ls = local.LocalProxy(lambda: foo)
ls.append(42)
ls.append(23)
ls[1:] = [1, 2, 3]
assert foo == [42, 1, 2, 3]
assert repr(foo) == repr(ls)
assert foo[0] == 42
foo += [1]
assert list(foo) == [42, 1, 2, 3, 1]
def test_local_proxy_operations_math(self):
foo = 2
ls = local.LocalProxy(lambda: foo)
assert ls + 1 == 3
assert 1 + ls == 3
assert ls - 1 == 1
assert 1 - ls == -1
assert ls * 1 == 2
assert 1 * ls == 2
assert ls / 1 == 2
assert 1.0 / ls == 0.5
assert ls // 1.0 == 2.0
assert 1.0 // ls == 0.0
assert ls % 2 == 0
assert 2 % ls == 0
def test_local_proxy_operations_strings(self):
foo = "foo"
ls = local.LocalProxy(lambda: foo)
assert ls + "bar" == "foobar"
assert "bar" + ls == "barfoo"
assert ls * 2 == "foofoo"
foo = "foo %s"
assert ls % ("bar",) == "foo bar"
def test_local_stack(self):
ident = local.get_ident()
ls = local.LocalStack()
assert ident not in ls._local.__storage__
assert ls.top is None
ls.push(42)
assert ident in ls._local.__storage__
assert ls.top == 42
ls.push(23)
assert ls.top == 23
ls.pop()
assert ls.top == 42
ls.pop()
assert ls.top is None
assert ls.pop() is None
assert ls.pop() is None
proxy = ls()
ls.push([1, 2])
assert proxy == [1, 2]
ls.push((1, 2))
assert proxy == (1, 2)
ls.pop()
ls.pop()
assert repr(proxy) == '<LocalProxy unbound>'
assert ident not in ls._local.__storage__
def test_local_proxies_with_callables(self):
foo = 42
ls = local.LocalProxy(lambda: foo)
assert ls == 42
foo = [23]
ls.append(42)
assert ls == [23, 42]
assert foo == [23, 42]
def named_user(session, request):
if request.param == 'NOT_LOGGED_IN':
return 'NOT_LOGGED_IN'
return LocalProxy(session.query(m.User).filter_by(name=request.param).one)
def student_user(session):
return LocalProxy(session.query(m.User).filter_by(name="Stupid1").one)
def teacher_user(session):
return LocalProxy(session.query(m.User).filter_by(name="Robin").one)
def ta_user(session):
return LocalProxy(
session.query(m.User).filter_by(name="Thomas Schaper").one
)
def admin_user(session):
return LocalProxy(session.query(m.User).filter_by(name="admin").one)
def context_manager(self):
"""LocalProxy refering to the app's instance of the :class: `ContextManager`.
Interface for adding and accessing contexts and their parameters
"""
return getattr(_app_ctx_stack.top, '_assist_context_manager', ContextManager())
def get_user(cls, user):
"""Get user object:
1º check if user is already User, LocalProxy, or AnnonymousUser object
if so, just return it
2º if not, search for this user and return if found.
3º otherwise, return DoesNotExists exception"""
if any([isinstance(user, obj) for obj in [UserMixin,
AnonymousUserMixin]]):
return user
if isinstance(user, LocalProxy):
return user._get_current_object()
try:
return User.objects.get(username=user)
except (me.DoesNotExist, me.ValidationError):
return User.objects.get(id=user)
def request_loader(request: LocalProxy) -> str:
"""Load user from Flask Request object."""
user = User.query.get(int(request.form.get('id') or 0))
if not user:
return
user.is_authenticated = user.password == request.form['password']
return user
def is_valid_assignment(self, request: LocalProxy, form: Form) -> bool:
"""Check if the assignment is valid, based on settings.
:param request: The request context object.
:param form: form to check
"""
return not self.setting('assignments').enabled or \
self.allowed_assignment(request, form)
def get_current_user() -> UserClass:
"""Returns the current user as a UserClass instance.
Never returns None; returns an AnonymousUser() instance instead.
This function is intended to be used when pillar.auth.current_user is
accessed many times in the same scope. Calling this function is then
more efficient, since it doesn't have to resolve the LocalProxy for
each access to the returned object.
"""
from ..api.utils.authentication import current_user
return current_user()
def test_model(self):
"""
Test model
:return: void
"""
test_db = DatabaseManager.get_sql_alchemy_instance()
self.assert_is_instance(db, LocalProxy)
self.assert_is_instance(db._get_current_object(), SQLAlchemy)
self.assert_equal_deep(test_db, db._get_current_object())
self.assert_equal_deep(sqlalchemy_mapper, mapper)
self.assert_equal_deep(sqlalchemy_relationship, relationship)
self.assert_equal_deep(sqlalchemy_backref, backref)
def test_request(self):
"""
Test request
:return: void
"""
rule = '/' + self.rand_str(20)
# Call route
with self.app.test_request_context(rule):
self.assert_is_not_none(request)
self.assert_is_instance(request, LocalProxy)
self.assert_is_instance(request._get_current_object(), Request)
self.assert_is_instance(request._get_current_object(), FlaskRequest)
def test_globals(self):
"""
Test globals
:return: void
"""
rule = '/' + self.rand_str(20)
self.assert_equal_deep(builtin_abc, abc)
self.assert_is_instance(ABC, abc.ABCMeta)
with self.app.test_request_context(rule):
self.assert_equal_deep(flask_current_app, current_app)
self.assert_equal_deep(flask_request, request)
self.assert_equal_deep(flask_session, session)
self.assert_equal_deep(flask_g, g)
self.assert_equal_deep(flask_has_request_context, has_request_context)
self.assert_equal_deep(flask_make_response, make_response)
self.assert_equal_deep(flask_abort, abort)
self.assert_equal_deep(flask__request_ctx_stack, _request_ctx_stack)
self.assert_equal_deep(flask_render_template, render_template)
self.assert_equal_deep(flask_redirect, redirect)
self.assert_equal_deep(flask_send_from_directory, send_from_directory)
self.assert_equal_deep(flask_jsonify, jsonify)
self.assert_is_instance(visitor, LocalProxy)
def test_session_cookie(self):
"""
Test the file
"""
rule = '/' + self.rand_str(20)
key = self.rand_str(20)
value = self.rand_str(20)
secret_key = self.rand_str(24)
# Write config
self.write_config([
"from edmunds.session.drivers.sessioncookie import SessionCookie \n",
"SECRET_KEY = '%s'\n" % secret_key,
"APP = { \n",
" 'session': { \n",
" 'enabled': True, \n",
" 'instances': [ \n",
" { \n",
" 'name': 'sessioncookie',\n",
" 'driver': SessionCookie,\n",
" }, \n",
" ], \n",
" }, \n",
"} \n",
])
# Create app
app = self.create_application()
self.assert_equal(secret_key, app.secret_key)
# Test session
with app.test_request_context(rule):
driver = app.session()
self.assert_equal_deep(session, driver)
self.assert_is_instance(driver, LocalProxy)
self.assert_is_instance(driver._get_current_object(), SecureCookieSession)
self.assert_false(driver.modified)
self.assert_not_in(key, driver)
driver[key] = value
self.assert_true(driver.modified)
self.assert_in(key, driver)
driver.pop(key, None)
self.assert_true(driver.modified)
self.assert_not_in(key, driver)
def test_outside_context(self):
"""
Test outside context
:return: void
"""
secret_key = self.rand_str(24)
# Write config
self.write_config([
"from edmunds.session.drivers.sessioncookie import SessionCookie \n",
"SECRET_KEY = '%s'\n" % secret_key,
"APP = { \n",
" 'session': { \n",
" 'enabled': True, \n",
" 'instances': [ \n",
" { \n",
" 'name': 'sessioncookie',\n",
" 'driver': SessionCookie,\n",
" }, \n",
" ], \n",
" }, \n",
"} \n",
])
# Create app
app = self.create_application()
self.assert_equal(secret_key, app.secret_key)
# Test extension
self.assert_in('edmunds.session', app.extensions)
self.assert_is_not_none(app.extensions['edmunds.session'])
self.assert_is_instance(app.extensions['edmunds.session'], SessionManager)
self.assert_is_instance(app.extensions['edmunds.session'].get(), LocalProxy)
with self.assert_raises_regexp(RuntimeError, 'Working outside of request context'):
self.assert_is_instance(app.extensions['edmunds.session'].get()._get_current_object(), SecureCookieSession)
self.assert_is_instance(app.extensions['edmunds.session'].get('sessioncookie'), LocalProxy)
with self.assert_raises_regexp(RuntimeError, 'Working outside of request context'):
self.assert_is_instance(app.extensions['edmunds.session'].get('sessioncookie')._get_current_object(), SecureCookieSession)
with self.assert_raises_regexp(RuntimeError, '[Nn]o instance'):
app.extensions['edmunds.session'].get('sessioncookie2')
def test_register(self):
"""
Test register
:return: void
"""
rule = '/' + self.rand_str(20)
secret_key = self.rand_str(24)
# Write config
self.write_config([
"from edmunds.session.drivers.sessioncookie import SessionCookie \n",
"SECRET_KEY = '%s'\n" % secret_key,
"APP = { \n",
" 'session': { \n",
" 'enabled': True, \n",
" 'instances': [ \n",
" { \n",
" 'name': 'sessioncookie',\n",
" 'driver': SessionCookie,\n",
" }, \n",
" ], \n",
" }, \n",
"} \n",
])
# Create app
app = self.create_application()
self.assert_equal(secret_key, app.secret_key)
# Test extension
self.assert_in('edmunds.session', app.extensions)
self.assert_is_not_none(app.extensions['edmunds.session'])
self.assert_is_instance(app.extensions['edmunds.session'], SessionManager)
# Test session
with app.test_request_context(rule):
self.assert_is_instance(app.extensions['edmunds.session'].get(), LocalProxy)
self.assert_is_instance(app.extensions['edmunds.session'].get()._get_current_object(), SecureCookieSession)
self.assert_is_instance(app.extensions['edmunds.session'].get('sessioncookie'), LocalProxy)
self.assert_is_instance(app.extensions['edmunds.session'].get('sessioncookie')._get_current_object(), SecureCookieSession)
with self.assert_raises_regexp(RuntimeError, '[Nn]o instance'):
app.extensions['edmunds.session'].get('sessioncookie2')