def error_mail(subject, data, r, via_web=True):
body = '''
remote URL: {r.url}
status code: {r.status_code}
request data:
{data}
status code: {r.status_code}
content-type: {r.headers[content-type]}
reply:
{r.text}
'''.format(r=r, data=data)
if not has_request_context():
via_web = False
if via_web:
user = get_username()
body = 'site URL: {}\nuser: {}\n'.format(request.url, user) + body
send_mail(subject, body)
python类has_request_context()的实例源码
def make_command(command):
"""Create an command from a method signature."""
# pylint: disable=missing-docstring
@wraps(command)
def actualcommand(self, *args, **kwds):
data = command(self, *args, **kwds)
name = command.__name__[3:]
# pylint: disable=protected-access
signal = '{uuid}{sep}{event}'.format(
uuid=self._uuid,
sep=SEPARATOR,
event=name
)
if flask.has_request_context():
emit(signal, {'data': pack(data)})
else:
sio = flask.current_app.extensions['socketio']
sio.emit(signal, {'data': pack(data)})
eventlet.sleep()
return actualcommand
def make_getter(getter):
"""Create an command from a method signature."""
# pylint: disable=missing-docstring
def get(self, timeout=10):
name = getter.__name__
# pylint: disable=protected-access
signal = '{uuid}{sep}{event}'.format(
uuid=self._uuid,
sep=SEPARATOR,
event=name
)
event = LightQueue(1)
if flask.has_request_context():
emit(signal, callback=lambda x: event.put(unpack(x)))
else:
sio = flask.current_app.extensions['socketio']
sio.emit(signal, callback=lambda x: event.put(unpack(x)))
data = event.get(timeout=timeout)
return getter(self, data)
# don't want to copy the signature in this case
get.__doc__ = getter.__doc__
return get
def save(key, value):
"""Store the key value pair.
Parameters
----------
key : str
The key to determine where it's stored, you'll need this to load the value later.
value : object
The value to store in the cache.
Returns
-------
None
"""
signal = 'cache_save'
if flask.has_request_context():
emit(signal, {'key': pack(key), 'data': pack(value)})
else:
sio = flask.current_app.extensions['socketio']
sio.emit(signal, {'key': pack(key), 'data': pack(value)})
eventlet.sleep()
def load(key):
"""Load the value stored with the key.
Parameters
----------
key : str
The key to lookup the value stored.
Returns
-------
object
The value if the key exists in the cache, otherwise None.
"""
signal = 'cache_load'
event = LightQueue(1)
if flask.has_request_context():
emit(signal, {'data': pack(key)}, callback=event.put)
else:
sio = flask.current_app.extensions['socketio']
sio.emit(signal, {'data': pack(key)}, callback=event.put)
return msgpack.unpackb(bytes(event.get(timeout=10)), encoding='utf8')
def _message(status, content):
"""Send message interface.
Parameters
----------
status : str
The type of message
content : str
"""
event = 'message.{}'.format(status)
if flask.has_request_context():
emit(event, dict(data=pack(content)))
else:
sio = flask.current_app.extensions['socketio']
sio.emit(event, dict(data=pack(content)))
eventlet.sleep()
def get_user_session():
if has_request_context() and not hasattr(_request_ctx_stack.top, 'user_session'):
# Look in the header
user_session = get_user_session_from_header()
# Fallback to the cookie
if user_session is None:
user_session = get_user_session_from_cookie()
if user_session is not None:
user = user_session.user
if not user.is_enabled:
user_session = None
# Set the user session
_request_ctx_stack.top.user_session = user_session
user_session = getattr(_request_ctx_stack.top, 'user_session', None)
if user_session is None:
user_session = AnonymousSession()
return user_session
def get_request_information():
"""
Returns a dictionary of contextual information about the user at the time of logging.
Returns:
The dictionary.
"""
information = {}
if has_request_context():
information["request"] = {
"api_endpoint_method": request.method,
"api_endpoint": request.path,
"ip": request.remote_addr,
"platform": request.user_agent.platform,
"browser": request.user_agent.browser,
"browser_version": request.user_agent.version,
"user_agent":request.user_agent.string
}
if api.auth.is_logged_in():
user = api.user.get_user()
team = api.user.get_team()
groups = api.team.get_groups()
information["user"] = {
"username": user["username"],
"email": user["email"],
"team_name": team["team_name"],
"groups": [group["name"] for group in groups]
}
return information
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def get_request_information():
"""
Returns a dictionary of contextual information about the user at the time of logging.
Returns:
The dictionary.
"""
information = {}
if has_request_context():
information["request"] = {
"api_endpoint_method": request.method,
"api_endpoint": request.path,
"ip": request.remote_addr,
"platform": request.user_agent.platform,
"browser": request.user_agent.browser,
"browser_version": request.user_agent.version,
"user_agent":request.user_agent.string
}
if api.auth.is_logged_in():
user = api.user.get_user()
team = api.user.get_team()
groups = api.team.get_groups()
information["user"] = {
"username": user["username"],
"email": user["email"],
"team_name": team["team_name"],
"groups": [group["name"] for group in groups]
}
return information
def get_request_information():
"""
Returns a dictionary of contextual information about the user at the time of logging.
Returns:
The dictionary.
"""
information = {}
if has_request_context():
information["request"] = {
"api_endpoint_method": request.method,
"api_endpoint": request.path,
"ip": request.remote_addr,
"platform": request.user_agent.platform,
"browser": request.user_agent.browser,
"browser_version": request.user_agent.version,
"user_agent":request.user_agent.string
}
if api.auth.is_logged_in():
user = api.user.get_user()
team = api.user.get_team()
groups = api.team.get_groups()
information["user"] = {
"username": user["username"],
"email": user["email"],
"team_name": team["team_name"],
"school": team["school"],
"groups": [group["name"] for group in groups]
}
return information
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def message(self):
if (self.legacy and has_request_context() and g.get('api_version') and
g.api_version == API_VERSIONS.v1):
return self.details # legacy for v1: return dict in message
if getattr(self, 'error_message', None) is not None:
return self.error_message # raise ValidateError('error message')
if self.details: # raise ValidateError(details={'smth': 'wrong'})
return u'Invalid data: {0}'.format(
json.dumps(self.details, ensure_ascii=False))
return u'Invalid data' # raise ValidateError()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def execute(self, response):
"""Generates fixture objects from the given response and stores them
in the application-specific cache.
:param response: the recorded :class:`Response`
"""
if not has_request_context:
return
self._fallback_fixture_names()
try:
app = self.auto_fixture.app
# Create response fixture
fixture = Fixture.from_response(response, app, self.response_name)
self.auto_fixture.add_fixture(fixture)
# Create request fixture
if request.data:
fixture = Fixture.from_request(request, app, self.request_name)
self.auto_fixture.add_fixture(fixture)
except TypeError: # pragma: no cover
warnings.warn("Could not create fixture for unsupported mime type")
return response
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def test_context_test(self):
app = flask.Flask(__name__)
self.assert_false(flask.request)
self.assert_false(flask.has_request_context())
ctx = app.test_request_context()
ctx.push()
try:
self.assert_true(flask.request)
self.assert_true(flask.has_request_context())
finally:
ctx.pop()
def login_user(cls, email, password):
try:
user = cls.query.filter_by(email=email).one()
except exc.InvalidRequestError:
return None
if pbkdf2.crypt(password, user.pwhash) == user.pwhash:
if flask.has_request_context():
user.last_login_ip = flask.request.remote_addr
db.session.commit()
return user
return None
def create(cls, email, nick, password, team=None):
first_user = True if not cls.query.count() else False
user = cls()
db.session.add(user)
user.email = email
user.nick = nick
user.set_password(password)
if not first_user:
user.team = team
else:
user.promote()
if flask.has_request_context():
user.create_ip = flask.request.remote_addr
return user