python类IP的实例源码

arp.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _load_existing_mappings(self):
        """Load the existing ARP records for this box from the db.

        Returns:

          A deferred whose result is a dictionary: { (ip, mac): arpid }
        """
        self._logger.debug("Loading open arp records from database")
        open_arp_records_queryset = manage.Arp.objects.filter(
            netbox__id=self.netbox.id,
            end_time__gte=datetime.max).values('id', 'ip', 'mac')
        open_arp_records = yield db.run_in_thread(
            storage.shadowify_queryset_and_commit,
            open_arp_records_queryset)
        self._logger.debug("Loaded %d open records from arp",
                           len(open_arp_records))

        open_mappings = dict(((IP(arp['ip']), arp['mac']), arp['id'])
                             for arp in open_arp_records)
        defer.returnValue(open_mappings)
statistics_mib.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_ipv4_multicast_groups_per_port(self):
        """
        Returns IGMP snooping information from ports.

        :returns: A Deferred whose result is a list of MulticastStat tuples
        """
        column = "hpIgmpStatsPortAccess2"
        ports = yield self.retrieve_columns(
            [column]
        ).addCallback(self.translate_result).addCallback(reduce_index)

        def _split(item):
            index, columns = item
            vlan = index[0]
            group = index[1:5]
            ifindex = index[5]
            access = columns[column]
            return MulticastStat(IP('.'.join(str(i) for i in group)), ifindex,
                                 vlan, access)

        defer.returnValue([_split(i) for i in iteritems(ports)])
oidparsers.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def oid_to_ipv4(oid):
    """Converts a sequence of 4 numbers to an IPv4 object in the fastest
    known way.

    :param oid: Any list or tuple of 4 integers.

    """
    if len(oid) != 4:
        raise ValueError("IPv4 address must be 4 octets, not %d" % len(oid))
    try:
        addr, = unpack("!I", array.array("B", oid).tostring())
    except OverflowError as error:
        raise ValueError(error)
    return IP(addr, ipversion=4)

#############################################################################
# Varios OID consumer functions, which can be fed to the consume() function #
#############################################################################
# pylint: disable=invalid-name
pynetsnmp.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, host, community="public", version="1", port=161,
                 retries=3, timeout=1):
        """Makes a new Snmp-object.

        :param host: hostname or IP address
        :param community: community (password), defaults to "public"
        :param port: udp port number, defaults to "161"

        """

        self.host = host
        self.community = str(community)
        self.version = str(version)
        if self.version == '2':
            self.version = '2c'
        if self.version not in ('1', '2c'):
            raise UnsupportedSnmpVersionError(self.version)
        self.port = int(port)
        self.retries = retries
        self.timeout = timeout

        self.handle = _MySnmpSession(self._build_cmdline())
        self.handle.open()
pynetsnmp.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _build_cmdline(self):
        try:
            address = IP(self.host)
        except ValueError:
            host = self.host
        else:
            host = ('udp6:[%s]' % self.host if address.version() == 6
                    else self.host)

        return (
            '-v' + self.version,
            '-c', self.community,
            '-r', str(self.retries),
            '-t', str(self.timeout),
            '%s:%s' % (host, self.port)
            )
util.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_valid_cidr(cidr):
    """Verifies that a string is valid IPv4 or IPv6 CIDR specification.

    A cleaned up version of the CIDR string is returned if it is verified,
    otherwise a false value is returned.

    Uses the IPy library to verify addresses.
    """
    if (isinstance(cidr, six.string_types) and
            not cidr.isdigit() and '/' in cidr):
        try:
            valid_cidr = IPy.IP(cidr) is not None
        except (ValueError, TypeError):
            return False
        else:
            return valid_cidr
    return False
ip_calc_utils.py 文件源码 项目:minihydra 作者: VillanCh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def is_ipv4(ip):
    """Check if the [ip] is a real ip addr.

    Params:
        ip: :str: General IP format.

    Returns:
        :type: bool
        :desc: if ip is a valid IP addr, return true
            else return False"""
    try:
        IP(ip)
        return True
    except ValueError:
        return False

#----------------------------------------------------------------------
verifyip.py 文件源码 项目:N4xD0rk 作者: n4xh4ck5 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def VerifyIp(ip):

    ip_type = None

    try:

        ip_type = IP(ip).iptype()

        if ip_type is not "PUBLIC":

            ip_type = 'Private'

        else:

            ip_type = 'Public'

    except Exception:

        pass

    finally:

        return ip_type
ExpResCriteriaCommon.py 文件源码 项目:ripe-atlas-monitor 作者: pierky 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __str__(self):
        all_subnets = True
        for ip in self.dst_ip:
            all_subnets = all_subnets and ip.prefixlen() not in [32, 128]

        more_than_one = len(self.dst_ip) > 1

        if all_subnets:
            tpl = "Destination IP must fall into {}"
        else:
            if more_than_one:
                tpl = "Destination IP must be in {}"
            else:
                tpl = "Destination IP must be {}"

        return tpl.format(self._str_list())
ExpResCriteriaDNSRecords.py 文件源码 项目:ripe-atlas-monitor 作者: pierky 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, cfg):
        ExpResCriterion_DNSRecord.__init__(self, cfg)

        self.address = []
        addresses = self._enforce_list("address", str)
        for address in addresses:
            try:
                ip = IPy.IP(address)
            except:
                raise ConfigError(
                    "Invalid IP for {} record: {}".format(
                        self.RECORD_TYPE, address
                    )
                )
            if ip.version() != self.IP_VER:
                raise ConfigError(
                    "Invalid IP version ({}) for record type {}.".format(
                        ip.version(), self.RECORD_TYPE
                    )
                )
            self.address.append(ip)
validate_ip_address.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_valid_ipv4(ip, version=4):
    import os
    import sys

    try:
        from IPy import IP
    except ImportError:
        try:
            command_to_execute = "pip install IPy || easy_install IPy"
            os.system(command_to_execute)
        except OSError:
            print "Can NOT install 'IPy', Aborted!"
            sys.exit(1)
        except Exception as e:
            print "Uncaught exception, %s" % e.message
            sys.exit(1)
        from IPy import IP
    try:
        result = IP(ip, ipversion=version)
    except ValueError:
        return False
    if result is not None and result != "":
        return True
kvm.py 文件源码 项目:ops 作者: xiaomatech 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def network_size(net, dhcp=None):
        """
        Func return gateway, mask and dhcp pool.
        """
        mask = IP(net).strNetmask()
        addr = IP(net)
        gateway = addr[1].strNormal()
        dhcp_pool = [addr[2].strNormal(), addr[addr.len() - 2].strNormal()]
        if dhcp:
            return gateway, mask, dhcp_pool
        else:
            return gateway, mask, None


# Read write lock
# ---------------
kvm.py 文件源码 项目:ops 作者: xiaomatech 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_ipv4_network(self):
        xml = self._XMLDesc(0)
        if util.get_xml_path(xml, "/network/ip") is None:
            return None
        addrStr = util.get_xml_path(xml, "/network/ip/@address")
        netmaskStr = util.get_xml_path(xml, "/network/ip/@netmask")
        prefix = util.get_xml_path(xml, "/network/ip/@prefix")

        if prefix:
            prefix = int(prefix)
            binstr = ((prefix * "1") + ((32 - prefix) * "0"))
            netmaskStr = str(IP(int(binstr, base=2)))

        if netmaskStr:
            netmask = IP(netmaskStr)
            gateway = IP(addrStr)
            network = IP(gateway.int() & netmask.int())
            ret = IP(str(network) + "/" + netmaskStr)
        else:
            ret = IP(str(addrStr))

        return ret
envs.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def update_hosts(self, hosts_info):  # pylint: disable=no-self-use
        """ Update hosts info """

        if not isinstance(hosts_info, dict):
            return api_utils.result_handler(
                status=1, data='Error, args should be a dict')

        for key, value in hosts_info.items():
            if key:
                try:
                    IPy.IP(value)
                except Exception:  # pylint: disable=broad-except
                    return api_utils.result_handler(
                        status=1, data='The IP %s is invalid' % value)
            else:
                return api_utils.result_handler(
                    status=1, data='Domain name is absent')

        try:
            functest_flag = "# SUT hosts info for Functest"
            hosts_list = ('\n{} {} {}'.format(ip, host_name, functest_flag)
                          for host_name, ip in hosts_info.items())

            with open("/etc/hosts", 'r') as file_hosts:
                origin_lines = [line for line in file_hosts
                                if functest_flag not in line]

            with open("/etc/hosts", 'w') as file_hosts:
                file_hosts.writelines(origin_lines)
                file_hosts.write(functest_flag)
                file_hosts.writelines(hosts_list)
        except Exception:  # pylint: disable=broad-except
            return api_utils.result_handler(
                status=1, data='Error when updating hosts info')
        else:
            return api_utils.result_handler(
                status=0, data='Update hosts info successfully')
server.py 文件源码 项目:PRCDNS 作者: lbp0200 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def data_received(self, data):
        data = self.get_data(data)
        if data is None:
            self.transport.close()
            return
        self.request = DNSRecord.parse(data)
        if self.args.debug:
            print('Data received: {!r}'.format(self.request))

        from IPy import IP
        ip = IP(self.peername[0])
        client_ip = self.peername[0]
        if ip.iptype() == 'PRIVATE':
            client_ip = self.args.myip

        url = 'https://dns.google.com/resolve?name={}&edns_client_subnet={}/24'.format(str(self.request.q.qname),
                                                                                       client_ip)
        # client = ProxyClient()
        # google_dns_resp = client.query_domain(url, self.args.proxy)
        try:
            from asyncio import ensure_future
        except ImportError:
            from asyncio import async as ensure_future

        asyncio.ensure_future(ProxyClient.query_domain(url, self.args.proxy), loop=self.loop).add_done_callback(
            functools.partial(self.send_resp))
server.py 文件源码 项目:PRCDNS 作者: lbp0200 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_arg():
    """????"""
    parser = argparse.ArgumentParser(prog='prcdns', description='google dns proxy.')
    parser.add_argument('--debug', help='debug model,default NO', default=False)
    parser.add_argument('-l', '--listen', help='listening IP,default 0.0.0.0', default='0.0.0.0')
    parser.add_argument('-p', '--port', help='listening Port,default 3535', default=3535)
    parser.add_argument('-r', '--proxy', help='Used For Query Google DNS,default direct', default=None)
    parser.add_argument('-ip', '--myip', help='IP location', default=None)

    return parser.parse_args()
socketcall_call.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def IP(ip):
        return ip
socketcall_call.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def formatStructValue(self, struct, name, value):
        if struct.startswith("sockaddr") and name.endswith("family"):
            return SOCKET_FAMILY.get(value, value)
        if struct == "sockaddr_in":
            if name == "sin_port":
                return ntoh_ushort(value)
            if name == "sin_addr":
                ip = ntoh_uint(value.s_addr)
                return IP(ip)
        return None
utilities.py 文件源码 项目:maltego_tds_transforms 作者: passivetotal 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def value_type(value):
    """Value type simply identifies if the passed value
    is a domain or IP address.

    @param  string  text to test as an IP
    @returns    domain or IP
    """
    try:
        IP(value)
        return 'ip'
    except:
        return 'domain'
hostcheck.py 文件源码 项目:shodan-scripts 作者: heywoodlh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def gen_info():
        return ("IP: %s \n Organization: %s \n Operating System: %s" % (host['ip_str'], host.get('org', 'n/a'), host.get('os', 'n/a')))


问题


面经


文章

微信
公众号

扫码关注公众号