python类IPNetwork()的实例源码

dos_xml_parser.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
ipaddresses.py 文件源码 项目:arouteserver 作者: pierky 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, ip):
        if ip_library == "ipaddr":
            self.obj = ipaddr.IPNetwork(ip)
            self.ip = str(self.obj.ip)
        else:
            self.obj = ipaddress.ip_network(ip)
            self.ip = str(self.obj.network_address)

        self.prefixlen = self.obj.prefixlen
        self.max_prefixlen = self.obj.max_prefixlen
        self.version = self.obj.version
http.py 文件源码 项目:Sentry 作者: NetEaseGame 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_valid_url(url):
    """
    Tests a URL to ensure it doesn't appear to be a blacklisted IP range.
    """
    parsed = urlparse(url)
    if not parsed.hostname:
        return False

    server_hostname = get_server_hostname()

    if parsed.hostname == server_hostname:
        return True

    try:
        ip_address = socket.gethostbyname(parsed.hostname)
    except socket.gaierror:
        return False

    if ip_address == server_hostname:
        return True

    ip_network = IPNetwork(ip_address)
    for addr in DISALLOWED_IPS:
        if ip_network in addr:
            return False

    return True
vlan.py 文件源码 项目:noc 作者: onfsdn 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, vid, conf=None):
        if conf is None:
            conf = {}
        self.vid = vid
        self.tagged = []
        self.untagged = []
        self.name = conf.setdefault('name', str(vid))
        self.description = conf.setdefault('description', self.name)
        self.controller_ips = conf.setdefault('controller_ips', [])
        if self.controller_ips:
            self.controller_ips = [
                ipaddr.IPNetwork(ip) for ip in self.controller_ips]
        self.unicast_flood = conf.setdefault('unicast_flood', True)
        self.routes = conf.setdefault('routes', {})
        self.ipv4_routes = {}
        self.ipv6_routes = {}
        if self.routes:
            self.routes = [route['route'] for route in self.routes]
            for route in self.routes:
                ip_gw = ipaddr.IPAddress(route['ip_gw'])
                ip_dst = ipaddr.IPNetwork(route['ip_dst'])
                assert(ip_gw.version == ip_dst.version)
                if ip_gw.version == 4:
                    self.ipv4_routes[ip_dst] = ip_gw
                else:
                    self.ipv6_routes[ip_dst] = ip_gw
        self.arp_cache = {}
        self.nd_cache = {}
        self.max_hosts = conf.setdefault('max_hosts', None)
        self.host_cache = {}
utils.py 文件源码 项目:rest_api 作者: opentargets 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self):
        esstore = current_app.extensions['es_access_store']
        mpstore = current_app.extensions['mp_access_store']
        args = self.parser.parse_args()
        event = args['event'][:120]
        auth_token=request.headers.get('Auth-Token')
        ip_resolver = current_app.config['IP_RESOLVER']
        ip = RateLimiter.get_remote_addr()
        ip_net = IPNetwork(ip)
        resolved_org = ip_resolver['default']
        for net, org in ip_resolver.items():
            if isinstance(net, (IPv4Network, IPv6Network)):
                if net.overlaps(ip_net):
                    resolved_org = org
                    break
        data = dict(ip=ip,
                    org=resolved_org,
                    host=request.host,
                    timestamp=datetime.now(),
                    event=event)
        if auth_token:
            payload = TokenAuthentication.get_payload_from_token(auth_token)
            data['app_name'] = payload['app_name']
        # esstore.store_event(data)
        mpstore.store_event(data)
        data['timestamp']= str(data['timestamp'])
        return CTTVResponse.OK(SimpleResult(None, data=data))
conn_graph_facts.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse_graph(self):
        """
        Parse  the xml graph file
        """
        deviceinfo = {}
        deviceroot = self.root.find(self.pngtag).find('Devices')
        devices = deviceroot.findall('Device')
        if devices is not None:
            for dev in devices:
                hostname = dev.attrib['Hostname']
                if hostname is not None:
                    deviceinfo[hostname] = {}
                    hwsku = dev.attrib['HwSku']
                    devtype = dev.attrib['Type']
                    deviceinfo[hostname]['HwSku'] = hwsku
                    deviceinfo[hostname]['Type'] = devtype
                    self.links[hostname] = {}
        devicel2info = {}
        devicel3s = self.root.find(self.dpgtag).findall('DevicesL3Info')
        devicel2s = self.root.find(self.dpgtag).findall('DevicesL2Info')
        if devicel2s is not None:
            for l2info in devicel2s:
                hostname = l2info.attrib['Hostname']
                if hostname is not None:
                    devicel2info[hostname] = {}
                    vlans = l2info.findall('InterfaceVlan')
                    for vlan in vlans:
                        portname = vlan.attrib['portname']
                        portmode = vlan.attrib['mode']
                        portvlanid = vlan.attrib['vlanids']
                        portvlanlist = self.port_vlanlist(portvlanid)
                        devicel2info[hostname][portname] = {'mode': portmode, 'vlanids': portvlanid, 'vlanlist': portvlanlist}
        if devicel3s is not None:
            for l3info in devicel3s:
                hostname = l3info.attrib['Hostname']
                if hostname is not None:
                    management_ip = l3info.find('ManagementIPInterface').attrib['Prefix']
                    deviceinfo[hostname]['ManagementIp'] = management_ip
                    mgmtip = ipaddress.IPNetwork(management_ip)
                    deviceinfo[hostname]['mgmtip'] = str(mgmtip.ip)
                    management_gw = str(mgmtip.network+1)
                    deviceinfo[hostname]['ManagementGw'] = management_gw
        allinks = self.root.find(self.pngtag).find('DeviceInterfaceLinks').findall('DeviceInterfaceLink')
        if allinks is not None:
            for link in allinks:
                start_dev = link.attrib['StartDevice']
                end_dev = link.attrib['EndDevice']
                if start_dev:
                    self.links[start_dev][link.attrib['StartPort']] = {'peerdevice':link.attrib['EndDevice'], 'peerport': link.attrib['EndPort']}
                if end_dev:
                    self.links[end_dev][link.attrib['EndPort']] = {'peerdevice': link.attrib['StartDevice'], 'peerport': link.attrib['StartPort']}
        self.devices = deviceinfo
        self.vlanport = devicel2info
valve.py 文件源码 项目:noc 作者: onfsdn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def add_controller_ips(self, controller_ips, vlan):
        ofmsgs = []
        for controller_ip in controller_ips:
            controller_ip_host = ipaddr.IPNetwork(
                '/'.join((str(controller_ip.ip),
                          str(controller_ip.max_prefixlen))))
            if controller_ip_host.version == 4:
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_ARP,
                        nw_dst=controller_ip_host,
                        vlan=vlan),
                    priority=self.dp.highest_priority))
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IP,
                        eth_dst=self.FAUCET_MAC,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMP,
                        nw_src=controller_ip,
                        nw_dst=controller_ip_host),
                    priority=self.dp.highest_priority))
            else:
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IPV6,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMPV6,
                        ipv6_nd_target=controller_ip_host,
                        icmpv6_type=icmpv6.ND_NEIGHBOR_SOLICIT),
                    priority=self.dp.highest_priority))
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IPV6,
                        eth_dst=self.FAUCET_MAC,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMPV6,
                        icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT),
                    priority=self.dp.highest_priority))
                ofmsgs.append(self.valve_flowcontroller(
                    self.dp.eth_src_table,
                    self.valve_in_match(
                        eth_type=ether.ETH_TYPE_IPV6,
                        vlan=vlan,
                        nw_proto=inet.IPPROTO_ICMPV6,
                        nw_dst=controller_ip_host,
                        icmpv6_type=icmpv6.ICMPV6_ECHO_REQUEST),
                    priority=self.dp.highest_priority))
        return ofmsgs


问题


面经


文章

微信
公众号

扫码关注公众号