python类ip_network()的实例源码

set.py 文件源码 项目:bgpfu 作者: bgpfu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def parse_prefix_range(expr=None):
        pattern = r'^(?P<prefix>((\d+(\.\d+){3})|([0-9a-fA-F:]{2,40}))/\d+)' \
                  r'(\^(?P<ge>\d+)-(?P<le>\d+))?$'
        match = re.match(pattern, expr)
        if match:
            prefix = ipaddress.ip_network(unicode(match.group("prefix")))
            afi = "ipv%d" % prefix.version
            entry = {"prefix": prefix}
            try:
                entry["greater-equal"] = int(match.group("ge"))
                entry["less-equal"] = int(match.group("le"))
            except (IndexError, TypeError):
                pass
        else:
            raise ValueError("no match found")
        return {afi: [entry]}
netmon.py 文件源码 项目:netmon 作者: bullerian 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _ipnet(net_str):
        """
        This function is used for argparse module to verify validness of
        entered network. Valid networks are IPv4 networks in form
        A.B.C.D/prefix
        :param net_str: string that represents IPv4 net. Example A.B.C.D/prefix.
        :return: ipaddress.ip_network object or raises exception on fail
        """
        try:
            the_net = ipaddress.ip_network(net_str)
        except ValueError:
            msg = '{} is not a valid IP network address'.format(net_str)
            raise argparse.ArgumentTypeError(msg)

        if the_net.version != Utilities.IPV4:
            msg = 'Only IPv4 addresses are supported'
            raise argparse.ArgumentTypeError(msg)

        return the_net
test_ping.py 文件源码 项目:netmon 作者: bullerian 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_arp_ping(self):
        """test ARP ping - compare to arping utilite"""

        arp_ping = Ping(IFACE, ARP_NAME, TIMEOUT, False)

        for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]:
            try:
                # need arping installed
                with os.popen('arping -c {} -t {} {}'.format(COUNT,
                                                             TIMEOUT,
                                                             str(ip)), 'r'):
                    # get exit code
                    ec = os.wait()[1] & 0xFF00
                res = arp_ping.ping_host(str(ip))
            except PermissionException:
                print('Need root previlegies')

            if res[STATUS_INDEX] == ONLINE:
                self.assertTrue(ec == 0)
            else:
                self.assertFalse(ec == 0)
test_ping.py 文件源码 项目:netmon 作者: bullerian 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_icmp_ping(self):
        """test icmp ping - compare to icmping utilite"""

        icmp_ping = Ping(IFACE, ICMP_NAME,  TIMEOUT, False)

        for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]:
            try:
                # need arping installed
                with os.popen('ping -c {} -t {} {}'.format(COUNT,
                                                           TIMEOUT,
                                                           str(ip)), 'r'):
                    # get exit code
                    ec = os.wait()[1] & 0xFF00
                res = icmp_ping.ping_host(str(ip))
            except PermissionException:
                print('Need root previlegies')
            if res[STATUS_INDEX] == ONLINE:
                self.assertTrue(ec == 0)
            else:
                self.assertFalse(ec == 0)
views.py 文件源码 项目:serveradmin 作者: innogames 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def edit(request):
    if 'object_id' in request.GET:
        server = Query({'object_id': request.GET['object_id']}).get()
    else:
        servertype = Servertype.objects.get(pk=request.POST['attr_servertype'])
        project = Project.objects.get(pk=request.POST['attr_project'])
        hostname = request.POST['attr_hostname']
        if servertype.ip_addr_type == 'null':
            intern_ip = None
        elif servertype.ip_addr_type == 'network':
            intern_ip = ip_network(request.POST['attr_intern_ip'])
        else:
            intern_ip = ip_interface(request.POST['attr_intern_ip'])
        server = ServerObject.new(servertype, project, hostname, intern_ip)

    return _edit(request, server, True)
NetworkMapper.py 文件源码 项目:oecluster 作者: OECFHTW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def scan_network(self):
        logger.info('scanning Network')
        network = self._config_reader.get_config_section("Networking")['network']
        netmask = self._config_reader.get_config_section("Networking")['netmask']
        my_net = ipaddress.ip_network(network+'/'+netmask)
        host_list = dict()
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(2.0)

        for ip in my_net:
            try:
                # print(ip)
                host = self._generate_host(ip)
                if host is not None:
                    host_list[ip] = host
            except socket.herror as ex:
                pass
        return host_list
test_connect.py 文件源码 项目:asyncpg 作者: MagicStack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()

        self.cluster.reset_hba()

        create_script = []
        create_script.append('CREATE ROLE ssl_user WITH LOGIN;')

        self.cluster.add_hba_entry(
            type='hostssl', address=ipaddress.ip_network('127.0.0.0/24'),
            database='postgres', user='ssl_user',
            auth_method='trust')

        self.cluster.add_hba_entry(
            type='hostssl', address=ipaddress.ip_network('::1/128'),
            database='postgres', user='ssl_user',
            auth_method='trust')

        # Put hba changes into effect
        self.cluster.reload()

        create_script = '\n'.join(create_script)
        self.loop.run_until_complete(self.con.execute(create_script))
types.py 文件源码 项目:djangolg 作者: wolcomm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, value):
        """Initialise new IPPrefix instance."""
        try:
            obj = ipaddress.ip_address(value)
        except Exception:
            try:
                obj = ipaddress.ip_network(value, strict=False)
            except Exception:
                raise
        if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
            self.prefix = None
            self.type = self.HOST
        elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
            self.prefix = obj.network_address
            self.type = self.PREFIX
        self.version = obj.version
        self.txt = obj.compressed
test_ipaddress.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_incompatible_versions(self):
        # These should always raise TypeError
        v4addr = ipaddress.ip_address('1.1.1.1')
        v4net = ipaddress.ip_network('1.1.1.1')
        v6addr = ipaddress.ip_address('::1')
        v6net = ipaddress.ip_address('::1')

        self.assertRaises(TypeError, v4addr.__lt__, v6addr)
        self.assertRaises(TypeError, v4addr.__gt__, v6addr)
        self.assertRaises(TypeError, v4net.__lt__, v6net)
        self.assertRaises(TypeError, v4net.__gt__, v6net)

        self.assertRaises(TypeError, v6addr.__lt__, v4addr)
        self.assertRaises(TypeError, v6addr.__gt__, v4addr)
        self.assertRaises(TypeError, v6net.__lt__, v4net)
        self.assertRaises(TypeError, v6net.__gt__, v4net)
test_ipaddress.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testIpFromInt(self):
        self.assertEqual(self.ipv4_interface._ip,
                         ipaddress.IPv4Interface(16909060)._ip)

        ipv4 = ipaddress.ip_network('1.2.3.4')
        ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
        self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
        self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))

        v6_int = 42540616829182469433547762482097946625
        self.assertEqual(self.ipv6_interface._ip,
                         ipaddress.IPv6Interface(v6_int)._ip)

        self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
                         4)
        self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
                         6)
test_ipaddress.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertIn(self.ipv4_address, dummy)
        self.assertIn(ip2, dummy)
plugin.py 文件源码 项目:Sigyn 作者: freenode 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _ip_ranges (self, h):
        if '/' in h:
            if h.startswith('gateway/web/freenode/ip.'):
               h = h.replace('gateway/web/freenode/ip.','')
               return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')]
            return [h]
        if utils.net.isIPV4(h):
            return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')]
        if utils.net.bruteIsIPV6(h):
            r = []
            r.append(ipaddress.ip_network(u'%s/120' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/118' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/116' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/114' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/112' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/110' % h, False).with_prefixlen.encode('utf-8'))
            r.append(ipaddress.ip_network(u'%s/64' % h, False).with_prefixlen.encode('utf-8'))
            return r
        return [h]
auth.py 文件源码 项目:django-heartbeat 作者: pbs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def is_authorized(ip, authorized_ips):
    ip = ip_address(ip)

    for item in authorized_ips:
        try:
            if ip == ip_address(item):
                return True
        except ValueError:
            try:
                if ip in ip_network(item):
                    return True
            except ValueError:
                logger.warn('The "authorized_ip" list (settings.HEARTBEAT)'
                            'contains an item that is neither an ip address '
                            'nor an ip network: {}'.format(item))
configurator.py 文件源码 项目:Home-Assistant 作者: jmart518 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_access(clientip):
    global BANNED_IPS
    if clientip in BANNED_IPS:
        return False
    if not ALLOWED_NETWORKS:
        return True
    for net in ALLOWED_NETWORKS:
        ipobject = ipaddress.ip_address(clientip)
        if ipobject in ipaddress.ip_network(net):
            return True
    BANNED_IPS.append(clientip)
    return False
validate.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def public_interface(self):
        """
        Check that all minions have an address on the public network
        """
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'master' in self.data[node]['roles']):
                continue
            found = False
            public_network = self.data[node].get("public_network", "")
            net_list = Util.parse_list_from_string(public_network)
            for address in self.grains[node]['ipv4']:
                try:
                    for network in net_list:
                        addr = ipaddress.ip_address(u'{}'.format(address))
                        net = ipaddress.ip_network(u'{}'.format(network))
                        if addr in net:
                            found = True
                except ValueError:
                    # Don't care about reporting a ValueError here if
                    # public_network is malformed, because the
                    # previous validation in public_network() will do that.
                    pass
            if not found:
                msg = "minion {} missing address on public network {}".format(node, public_network)
                self.errors.setdefault('public_interface', []).append(msg)

        self._set_pass_status('public_network')
validate.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cluster_interface(self):
        """
        Check that storage nodes have an interface on the cluster network
        """
        # pylint: disable=too-many-nested-blocks
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'storage' in self.data[node]['roles']):
                found = False
                cluster_network = self.data[node].get("cluster_network", "")
                net_list = Util.parse_list_from_string(cluster_network)
                for address in self.grains[node]['ipv4']:
                    try:
                        for network in net_list:
                            addr = ipaddress.ip_address(u'{}'.format(address))
                            net = ipaddress.ip_network(u'{}'.format(network))
                            if addr in net:
                                found = True
                    except ValueError:
                        # Don't care about reporting a ValueError here if
                        # cluster_network is malformed, because the
                        # previous validation in cluster_network() will do that.
                        pass
                if not found:
                    msg = "minion {}".format(node)
                    msg += " missing address on cluster network {}".format(cluster_network)
                    self.errors.setdefault('cluster_interface', []).append(msg)

        self._set_pass_status('cluster_interface')
public.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def address():
    """
    Find the public address for a minion
    """

    if 'public_network' not in __pillar__:
        return ""

    log.debug("pillar: {}".format(type(__pillar__['public_network'])))
    if isinstance(__pillar__['public_network'], str):
        networks = re.split(', *', __pillar__['public_network'])
    else:
        networks = __pillar__['public_network']

    log.debug("networks: {}".format(pprint.pformat(networks)))
    for public_network in networks:
        log.info('public_network: {}'.format(public_network))
        interfaces = __salt__['network.interfaces']()

        log.debug("interfaces: {}".format(pprint.pformat(interfaces)))
        for interface in interfaces:
            log.info("interface: {}".format(pprint.pformat(interface)))
            if 'inet' in interfaces[interface]:
                for entry in interfaces[interface]['inet']:
                    _address = entry['address']
                    log.info("Checking address {}".format(_address))
                    if (ipaddress.ip_address(u'{}'.format(_address)) in
                        ipaddress.ip_network(u'{}'.format(public_network))):
                        return _address
    return ""
config.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_ip_network(network, strict=True):
    '''
    If network is a valid IP network when parsed according to strict,
    return it as an ipnetwork object.
    Otherwise, return None.
    '''
    try:
        return ipaddress.ip_network(unicode(address), strict=strict)
    except ValueError:
        return None
endpoints.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_networks():
    network_tokens = os.environ.get('TALISKER_NETWORKS', '').split()
    networks = [ip_network(force_unicode(n)) for n in network_tokens]
    if networks:
        logger = logging.getLogger(__name__)
        logger.info('configured TALISKER_NETWORKS',
                    extra={'networks': [str(n) for n in networks]})
    return networks
_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def cast_network(s, cur=None):
    if s is None:
        return None
    return ipaddress.ip_network(str(s))


问题


面经


文章

微信
公众号

扫码关注公众号