core.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:ussd_airflow 作者: mwaaas 项目源码 文件源码
def get_or_create_session_id(self, user_id):
        session_mapping = get_object_or_None(SessionLookup, user_id=user_id)

        # if its missing create a new one.
        if session_mapping is None:
            session_mapping = SessionLookup.objects.create(
                user_id=user_id,
                session_id=generate_session_id()
            )
        else:
            session = ussd_session(session_mapping.session_id)

            # get last time session was updated
            if session.get(ussd_airflow_variables.last_update):
                last_updated = utilities.string_to_datetime(
                    session[ussd_airflow_variables.last_update])
            else:
                last_updated = timezone.make_naive(session_mapping.updated_at)

            # check inactivity or if session has been closed
            inactivity_duration = (datetime.now() - last_updated).total_seconds()
            if inactivity_duration > self.expiry or \
                    session.get(ussd_airflow_variables.expiry):

                # update session_mapping with the new session_id
                session_mapping.session_id = generate_session_id()
                session_mapping.save()

        return session_mapping.session_id
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号