Junos.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:assimilator 作者: videlanicolas 项目源码 文件源码
def get(self,args):
        logger.debug("class rules(JUNOS).get({0})".format(str(args)))
        if not self.dev.connected:
            logger.error("{0}: Firewall timed out or incorrect device credentials.".format(self.firewall_config['name']))
            return {'error' : 'Could not connect to device.'}, 504
        else:
            logger.info("{0}: Connected successfully.".format(self.firewall_config['name']))
        try:
            soup = BS(str(etree.tostring(self.dev.rpc.get_firewall_policies(), encoding='unicode')),'xml')
            logger.debug("soup: " + str(soup))
        except Exception as e:
            logger.error("Error parsing soup: {0}".format(str(e)))
            return {'error' : 'Error parsing soup.'}, 500
        finally:
            logger.debug("Closing device...")
            self.dev.close()
        entries = list()
        for context in soup.find("security-policies").children:         
            if type(context) != Tag:
                continue
            elif context.name == "default-policy":
                continue
            else:
                logger.debug("context: {0}".format(str(context)))
            src_zone = context.find("context-information").find("source-zone-name").text
            dst_zone = context.find("context-information").find("destination-zone-name").text
            logger.debug("src_zone: {0}\ndst_zone: {1}\n".format(src_zone,dst_zone))
            for rule in context.children:
                logger.debug("Rule: {0}".format(str(rule)))
                if rule.name == "context-information" or type(rule) != Tag:
                    continue
                aux = {
                    "enabled" : True if rule.find('policy-state').text == 'enabled' else False,
                    "id" : int(rule.find('policy-identifier').text),
                      "action": rule.find('policy-information').find('policy-action').find('action-type').text,
                      "destination": list(),
                      "from": src_zone,
                      "logging": False if rule.find('policy-information').find('policy-action').find('log') else rule.find('policy-information').find('policy-action').find('log'),
                      "name": rule.find('policy-information').find('policy-name').text,
                      "application": list(),
                      "source": list(),
                    "to": dst_zone
                    }
                for addr in rule.find('source-addresses').children:
                    if type(addr) != Tag:
                        continue
                    aux['source'].append(addr.find('address-name').text)
                for addr in rule.find('destination-addresses').children:
                    if type(addr) != Tag:
                        continue
                    aux['destination'].append(addr.find('address-name').text)
                for addr in rule.find('applications').children:
                    if type(addr) != Tag:
                        continue
                    aux['application'].append(addr.find('application-name').text)
                entries.append(aux)
        #entries = self.filter(args,entries)
        return {'len' : len(entries), 'rules' : entries}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号