tasks.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:sensu_drive 作者: ilavender 项目源码 文件源码
def user_rules(message):
    users_rules = {}
    try:        
        for obj in Rule.objects.filter(owner=message['user_id']):

            if obj.owner.id not in users_rules:
                users_rules[obj.owner.id] = {}

            for status in obj.status:
                if int(status) not in users_rules[obj.owner.id]:
                    users_rules[obj.owner.id][int(status)] = []
                users_rules[obj.owner.id][int(status)].append(obj.regex_string)            

    except:
        logger.error("user_rules failed")
        raise

    for user_id in users_rules:
        for status in users_rules[user_id]:
            uni_regex = '|'.join(users_rules[user_id][status])
            logger.debug("user_rules build rule for user: %s status: %s" % (user_id, status))
            r.set('regexrule_%s_%s' % (user_id, status), pickle.dumps(re.compile(r'(?:%s)' % uni_regex, re.IGNORECASE)))
        for status in [1,2]:
            if status not in users_rules[user_id]:
                logger.debug("user_rules clean rule for user: %s status: %s" % (user_id, status))
                r.delete('regexrule_%s_%s' % (user_id, status))
    return
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号