def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session:
if session.modified:
self.cache.delete(self.key_prefix + session.sid)
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
data = dict(session)
self.cache.set(self.key_prefix + session.sid, data,
total_seconds(app.permanent_session_lifetime))
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)
python类want_bytes()的实例源码
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
# create a new session if the request does not include a token
if not sid:
return self.session_class(sid=self._generate_sid())
# attempt to retrieve the session associated with the given token
store_id = self.key_prefix + sid
saved_session = self.sql_session_model.query.filter_by(session_id=store_id).first()
# create a new session if the token doesn't represent a valid session
if not saved_session:
return self.session_class(sid=self._generate_sid())
# create a new session if the session has expired
elif saved_session.expiry <= datetime.utcnow():
# purge the expired session
self.db.session.delete(saved_session)
self.db.session.commit()
return self.session_class(sid=self._generate_sid())
# handle valid sessions
else:
try:
val = saved_session.data
data = self.serializer.loads(want_bytes(val))
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid)
def file_upload():
"""
POST /file
Uploads the file to the server.
"""
try:
mimetype = request.form["mimetype"]
except KeyError:
return JsonResponse({"error": "no mimetype"}, 400)
try:
file = request.files["file"]
except KeyError:
return JsonResponse({"error": "no file"}, 400)
filename = werkzeug.utils.secure_filename(file.filename)
with tempfile.NamedTemporaryFile(
dir=app.config['MEDIA_DIR'], delete=False) as tf:
file.save(tf)
file_record = models.File(
title=filename,
mimetype=mimetype,
path=tf.name
)
with start_session() as session:
session.add(file_record)
session.commit()
file_id = file_record.id
return JsonResponse({
"id": file_id,
"signedId":
signer.sign(itsdangerous.want_bytes(file_id)).decode('utf-8'),
"title": filename,
"mimetype": mimetype
}, status=203)
def create_signed_key(user):
"""Generate a valid auth token for the user, and sign it."""
key = core.create_session_key(user)
signer = get_signer()
return signer.sign(want_bytes(key)).decode('ascii')
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session:
if session.modified:
self.redis.delete(self.key_prefix + session.sid)
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
# Modification case. There are upsides and downsides to
# emitting a set-cookie header each request. The behavior
# is controlled by the :meth:`should_set_cookie` method
# which performs a quick check to figure out if the cookie
# should be set or not. This is controlled by the
# SESSION_REFRESH_EACH_REQUEST config flag as well as
# the permanent flag on the session itself.
# if not self.should_set_cookie(app, session):
# return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = self.serializer.dumps(dict(session))
self.redis.setex(name=self.key_prefix + session.sid, value=val,
time=total_seconds(app.permanent_session_lifetime))
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
if self.use_signer:
signer = self._get_signer(app)
if signer is None:
return None
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
full_session_key = self._encode_key(self.key_prefix) + self._encode_key(sid)
val = self.client.get(full_session_key)
if val is not None:
try:
if not PY2:
val = want_bytes(val)
data = self.serializer.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid, permanent=self.permanent)
return self.session_class(sid=sid, permanent=self.permanent)
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
full_session_key = self._encode_key(self.key_prefix) + self._encode_key(session.sid)
if not session:
if session.modified:
self.client.delete(full_session_key)
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
if not PY2:
val = self.serializer.dumps(dict(session))
else:
val = self.serializer.dumps(dict(session))
self.client.set(full_session_key, val, self._get_memcache_timeout(
total_seconds(app.permanent_session_lifetime)))
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
if self.use_signer:
signer = self._get_signer(app)
if signer is None:
return None
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
store_id = self.key_prefix + sid
document = self.store.find_one({'id': store_id})
if document:
expiration = document.get('expiration')
if expiration and expiration <= datetime.utcnow():
# Delete expired session
self.store.remove({'id': store_id})
document = None
if document is not None:
try:
val = document['val']
data = self.serializer.loads(want_bytes(val))
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid, permanent=self.permanent)
return self.session_class(sid=sid, permanent=self.permanent)
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
store_id = self.key_prefix + session.sid
saved_session = self.sql_session_model.query.filter_by(
session_id=store_id).first()
if not session:
if session.modified:
if saved_session:
self.db.session.delete(saved_session)
self.db.session.commit()
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = self.serializer.dumps(dict(session))
if saved_session:
saved_session.data = val
saved_session.expiry = expires
self.db.session.commit()
else:
new_session = self.sql_session_model(store_id, val, expires)
self.db.session.add(new_session)
self.db.session.commit()
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session:
if session.modified:
self.client.delete_item(
TableName=self.table_name,
Key={
'SessionId': {
'S': self.key_prefix + session.sid
}
}
)
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = self.serializer.dumps(dict(session))
self.client.put_item(
TableName=self.table_name,
Item={
'SessionId': {
'S': self.key_prefix + session.sid
},
'Session': {
'S': val
}
}
)
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)
def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=True)
signer = self._get_signer(app)
if signer is None:
return None
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=True)
db_session = SessionManager.Session()
saved_session = db_session.query(ServerSession).filter(ServerSession.session_id == sid).first()
if saved_session and not saved_session.expiry and saved_session.expiry <= datetime.utcnow():
# delete expired session
db_session.delete(saved_session)
db_session.commit()
saved_session = None
if saved_session:
try:
val = saved_session.data
data = self.serializer.loads(want_bytes(val))
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid, permanent=True)
return self.session_class(sid=sid, permanent=True)
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
sid = session.sid
db_session = SessionManager.Session()
saved_session = db_session.query(ServerSession).filter(ServerSession.session_id == sid).first()
if not session:
if session.modified:
if saved_session:
db_session.delete(saved_session)
db_session.commit()
response.delete_cookie(app.session_cookie.name, domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
# expires = datetime.utcnow() + timedelta(minutes=1)
# if expires is None:
# expires = datetime.utcnow() + timedelta(days=1)
val = self.serializer.dumps(dict(session))
if saved_session:
saved_session.data = val
saved_session.expiry = expires
db_session.commit()
else:
new_session = ServerSession(session_id=sid, data=val, expiry=expires)
db_session.add(new_session)
db_session.commit()
session_id = self._get_signer(app).sign(want_bytes(session.sid))
response.set_cookie(app.session_cookie_name,
session_id,
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure)