def get_or_create_session_id(self, user_id):
session_mapping = get_object_or_None(SessionLookup, user_id=user_id)
# if its missing create a new one.
if session_mapping is None:
session_mapping = SessionLookup.objects.create(
user_id=user_id,
session_id=generate_session_id()
)
else:
session = ussd_session(session_mapping.session_id)
# get last time session was updated
if session.get(ussd_airflow_variables.last_update):
last_updated = utilities.string_to_datetime(
session[ussd_airflow_variables.last_update])
else:
last_updated = timezone.make_naive(session_mapping.updated_at)
# check inactivity or if session has been closed
inactivity_duration = (datetime.now() - last_updated).total_seconds()
if inactivity_duration > self.expiry or \
session.get(ussd_airflow_variables.expiry):
# update session_mapping with the new session_id
session_mapping.session_id = generate_session_id()
session_mapping.save()
return session_mapping.session_id
评论列表
文章目录