python类ip_address()的实例源码

make_result_matrix.py 文件源码 项目:typepy 作者: thombashi 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        self.typeclass = None
        self.strict_level = None
        self.header_list = None
        self.value_list = None

        self.__table_writer = ptw.RstSimpleTableWriter()
        self.__table_writer._dp_extractor.type_value_mapping = {
            NullString(None).typecode: '``""``',
            NoneType(None).typecode: "``None``",
            Infinity(None).typecode: '``Decimal("inf")``',
            Nan(None).typecode: '``Decimal("nan")``',
        }
        self.__table_writer._dp_extractor.const_value_mapping = {
            True: "``True``",
            False: "``False``",
            '``"127.0.0.1"``': '``ip_address("127.0.0.1")``',
        }
generate_fake_data.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def make_hosts(region, cell):
    # no of hosts need to match ip_address available
    no_of_hosts = 2
    cab1 = region + "." + cell + "." + "C-1"
    cab2 = region + "." + cell + "." + "C-2"

    hosts = []
    for host in range(no_of_hosts):
        hostname = "host%s.%s.example1.com" % (host, cab1)
        hosts.append(hostname)

    for host in range(no_of_hosts):
        hostname = "host%s.%s.example2.com" % (host, cab2)
        hosts.append(hostname)

    return hosts
generate_fake_data.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def create_netdevice(self, name, device_type):
        network_devices_url = self.url + "/network-devices"
        payload = {"name": name,
                   "model_name": "model-x",
                   "os_version": "version-1",
                   "device_type": device_type,
                   "ip_address": "10.10.1.1",
                   "active": True,
                   "cloud_id": self.cloud.get("id"),
                   "region_id": self.region.get("id"),
                   "cell_id": self.cell.get("id")}

        resp = requests.post(network_devices_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        return resp.json()
match.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_random_load_nonentry():
    '''
    Return a random item that probably isn't in match_func_result['load'].
    '''
    match_type = sys.argv[1]
    if match_type == 'ipasn':
        # Yes, we could do IPv6 here. But the type of the list doesn't matter:
        # a random IPv4 might not be in an IPv4 list, and it won't be in an
        # IPv6 list
        random_32_bit = random.randint(0, 2**32 - 1)
        ip = ipaddress.ip_address(random_32_bit)
        return ip
    else:
        char_list = list(get_random_load_entry())
        random.shuffle(char_list)
        return "".join(char_list)

# try to make sure that other processes don't warp the results too much
config.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_ip_network_address_prefix(address, prefix, strict=True):
    '''
    If address/prefix is a valid IP network when parsed according to strict,
    return it as an ipnetwork object.
    Otherwise, return None.
    '''
    ip_address = validate_ip_address(address)
    if ip_address is None or ip_address.version not in [4,6]:
        return None
    try:
        if ip_address.version == 4:
            return ipaddress.IPv4Network((ip_address, prefix), strict=strict)
        else:
            return ipaddress.IPv6Network((ip_address, prefix), strict=strict)
    except ValueError:
        return None
endpoints.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def private(f):
    """Only allow approved source addresses."""

    @functools.wraps(f)
    def wrapper(self, request):
        if not request.access_route:
            # this means something probably bugged in werkzeug, but let's fail
            # gracefully
            return Response('no client ip provided', status='403')
        ip_str = request.access_route[-1]
        if isinstance(ip_str, bytes):
            ip_str = ip_str.decode('utf8')
        ip = ip_address(ip_str)
        if ip.is_loopback or any(ip in network for network in get_networks()):
            return f(self, request)
        else:
            msg = PRIVATE_BODY_RESPONSE_TEMPLATE.format(
                ip_str,
                force_unicode(request.remote_addr),
                request.headers.get('x-forwarded-for'))
            return Response(msg, status='403')
    return wrapper
interfaces.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def set_unset(self, interface_id, attribute, address=None):
        """
        Set attribute to True and unset the same attribute for all other
        interfaces. This is used for interface options that can only be
        set on one engine interface.
        """
        for interface in self:
            for sub_interface in interface.sub_interfaces():
                # Skip VLAN only interfaces (no addresses)
                if not isinstance(sub_interface, PhysicalVlanInterface):
                    if getattr(sub_interface, attribute) is not None:
                        if sub_interface.nicid == str(interface_id):
                            if address is not None: # Find IP on Node Interface
                                if ipaddress.ip_address(bytes_to_unicode(address)) in \
                                    ipaddress.ip_network(sub_interface.network_value):
                                    setattr(sub_interface, attribute, True)
                                else:
                                    setattr(sub_interface, attribute, False)
                            else:
                                setattr(sub_interface, attribute, True)
                        else: #unset
                            setattr(sub_interface, attribute, False)
zeroconf_listener.py 文件源码 项目:SupercomputerInABriefcase 作者: SupercomputerInABriefcase 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_service(self, zeroconf, type, name):
        """This gets called for services discovered.

        Example:

        Service SupercomputerInABriefcase on tom-xps._sciabc._tcp.local. added,
        service info:
            ServiceInfo(
                type='_sciabc._tcp.local.',
                name='SupercomputerInABriefcase on tom-xps._sciabc._tcp.local.',
                address=b'\n\xb7\xcd\xb6',
                port=34343,
                weight=0,
                priority=0,
                server='tom-xps.local.',
                properties={}
            )
        """
        info = zeroconf.get_service_info(type, name)
        print("Service %s added, service info: %s" % (name, info))
        ip = ipaddress.ip_address(info.address)
        append_ip_to_nodes_file(str(ip))
ceil_cli.py 文件源码 项目:python-miio 作者: rytilahti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate_ip(ctx, param, value):
    try:
        ipaddress.ip_address(value)
        return value
    except ValueError as ex:
        raise click.BadParameter("Invalid IP: %s" % ex)
server.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_ban(self, ip, reason, duration, name=None):
        """
        Ban an ip with an optional reason and duration in minutes. If duration
        is None, ban is permanent.
        """
        network = ip_network(text_type(ip), strict=False)
        for connection in list(self.connections.values()):
            if ip_address(connection.address[0]) in network:
                name = connection.name
                connection.kick(silent=True)
        if duration:
            duration = reactor.seconds() + duration * 60
        else:
            duration = None
        self.bans[ip] = (name or '(unknown)', reason, duration)
        self.save_bans()
ioc_common.py 文件源码 项目:iocage 作者: iocage 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sort_ip(ip, version="4"):
    """Sort the list by IP address."""
    list_length = len(ip)

    # Length 10 is list -l, 5 is list

    if list_length == 10:
        try:
            ip = ip[7] if version == "4" else ip[8]
            _ip = str(ipaddress.ip_address(ip.rsplit("|")[1]))
        except (ValueError, IndexError):
            # Lame hack to have "-" last.
            _ip = "Z"
    elif list_length == 6:
        try:
            _ip = str(ipaddress.ip_address(ip[5]))
        except ValueError:
            # Lame hack to have "-" last.
            _ip = "Z"
    else:
        _ip = ip

    return _ip
test_nexthop.py 文件源码 项目:sonic-snmpagent 作者: Azure 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_getnextpdu(self):
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=(
                ObjectIdentifier(10, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7)),
            )
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
test_nexthop.py 文件源码 项目:sonic-snmpagent 作者: Azure 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_getnextpdu_exactmatch(self):
        oid = ObjectIdentifier(14, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 21, 1, 7, 0, 0, 0, 0))
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=[oid]
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        print("test_getnextpdu_exactmatch: ", str(oid))
        self.assertEqual(str(value0.name), str(oid))
        self.assertEqual(str(value0.data), ipaddress.ip_address("10.0.0.1").packed.decode())
test_forward.py 文件源码 项目:sonic-snmpagent 作者: Azure 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_getnextpdu(self):
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=(
                ObjectIdentifier(21, 0, 0, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0)),
            )
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
test_forward.py 文件源码 项目:sonic-snmpagent 作者: Azure 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_getnextpdu_exactmatch(self):
        oid = ObjectIdentifier(24, 0, 1, 0, (1, 3, 6, 1, 2, 1, 4, 24, 4, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 17))
        get_pdu = GetNextPDU(
            header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
            oids=[oid]
        )

        encoded = get_pdu.encode()
        response = get_pdu.make_response(self.lut)
        print(response)

        n = len(response.values)
        value0 = response.values[0]
        self.assertEqual(value0.type_, ValueType.IP_ADDRESS)
        print("test_getnextpdu_exactmatch: ", str(oid))
        self.assertEqual(str(value0.name), str(oid))
        self.assertEqual(str(value0.data), ipaddress.ip_address("0.0.0.0").packed.decode())
rfc4292.py 文件源码 项目:sonic-snmpagent 作者: Azure 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reinit_data(self):
        """
        Subclass update loopback information
        """
        self.loips = {}

        self.db_conn.connect(mibs.APPL_DB)
        loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*")
        if not loopbacks:
            return

        # collect only ipv4 interfaces
        for loopback in loopbacks:
            lostr = loopback.decode()
            loip = lostr[len("INTF_TABLE:lo:"):]
            ipa = ipaddress.ip_address(loip)
            if isinstance(ipa, ipaddress.IPv4Address):
                self.loips[loip] = ipa
views.py 文件源码 项目:desec-stack 作者: desec-io 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def perform_create(self, serializer, remote_ip):
        captcha = (
                ipaddress.ip_address(remote_ip) not in ipaddress.IPv6Network(os.environ['DESECSTACK_IPV6_SUBNET'])
                and (
                    User.objects.filter(
                        created__gte=timezone.now()-timedelta(hours=settings.ABUSE_BY_REMOTE_IP_PERIOD_HRS),
                        registration_remote_ip=remote_ip
                    ).count() >= settings.ABUSE_BY_REMOTE_IP_LIMIT
                    or
                    User.objects.filter(
                        created__gte=timezone.now() - timedelta(hours=settings.ABUSE_BY_EMAIL_HOSTNAME_PERIOD_HRS),
                        email__endswith=serializer.validated_data['email'].split('@')[-1]
                    ).count() >= settings.ABUSE_BY_EMAIL_HOSTNAME_LIMIT
                )
            )

        user = serializer.save(registration_remote_ip=remote_ip, captcha_required=captcha)
        if user.captcha_required:
            send_account_lock_email(self.request, user)
        elif not user.dyn:
            context = {'token': user.get_token()}
            send_token_email(context, user)
        signals.user_registered.send(sender=self.__class__, user=user, request=self.request)
locations.py 文件源码 项目:toshi-reputation-service 作者: toshiapp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_location_from_geolite2(pool, ip_addr):

    try:
        ip_addr = ipaddress.ip_address(ip_addr)
        # fix issues with asyncpg not handling ip address netmask defaults
        # correctly
        ip_addr = "{}/{}".format(ip_addr, 32 if ip_addr.version == 4 else 128)
        async with pool.acquire() as con:
            row = await con.fetchrow(
                "SELECT cs.country_iso_code FROM geolite2_ip_addresses ips "
                "JOIN geolite2_countries cs ON ips.geoname_id = cs.geoname_id "
                "WHERE ips.network >> $1", ip_addr)
        return row['country_iso_code'] if row else None
    except ValueError:
        return None
    except asyncpg.exceptions.UndefinedTableError:
        log.warning("Missing GeoLite2 database tables")
        return None
test_nmap.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_scandata_parsing(self):
        '''Tests parsing of the basic surface scan'''
        scan = ndr.NmapScan()

        scan.parse_nmap_xml(load_nmap_xml_data(TEST_SURFACE_SCAN_DATA))
        self.assertEqual(5, len(scan))

        # Got three hosts, find a host by MAC address
        host = scan.find_by_mac('84:39:BE:64:3F:E5')[0]

        # SHould only find one IP address
        self.assertNotEqual(None, host)

        self.assertTrue(host.addr, ipaddress.ip_address("192.168.2.100"))
        self.assertEqual(host.reason, ndr.NmapReasons.ARP_RESPONSE)

        # Confirm that we're not finding phantom hosts
        self.assertEqual(scan.find_by_mac('NOT_A_REAL_MAC'), None)
test_nmap.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ipv6_linklocal_parsing(self):
        '''Tests the results of the IPv6 link-local scan'''

        scan = ndr.NmapScan()
        scan.parse_nmap_xml(load_nmap_xml_data(TEST_V6_LINK_LOCAL_SCAN_DATA))

        self.assertEqual(3, len(scan))

        # Got three hosts, find a host by MAC address
        host_list = scan.find_by_mac('84:39:BE:64:3F:E5')

        # SHould only find one IP address
        self.assertEqual(len(host_list), 1)

        host = host_list[0]
        self.assertTrue(host.reason, ndr.NmapReasons.ND_RESPONSE)
        self.assertEqual(host.addr, ipaddress.ip_address("fe80::8639:beff:fe64:3fe5"))
nmap_runner.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def from_dict(self, config_dict):
        '''Load settings from dictionary'''
        # Load the easy objects first
        if config_dict.get('version', None) != 1:
            raise ValueError("Unknown NDR NMAP config file version!")

        # Clean out the IP lists
        self.mac_address_config = {}
        self.ip_address_config = {}

        machine_ips = config_dict.get('machine_ips', dict())
        machine_macs = config_dict.get('machine_macs', dict())

        # Load in the machine IP addresses
        for ip_addr, value in machine_ips.items():
            self.ip_address_config[ipaddress.ip_address(ip_addr)] = NmapScanMode(value)

        # Now do it again with the MAC addresses. When we load in MACs, make them all
        # upper case to be consistent with NmapHosts

        for mac_addr, value in machine_macs.items():
            self.mac_address_config[mac_addr.upper()] = NmapScanMode(value)
nmap_runner.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def build_nmap_commandline(self, base_flags, address, interface=None):
        '''Builds common NMAP option command lines'''
        ipaddr = ipaddress.ip_address(address)


        # Several bits of magic are required here
        # 1. If we're v6 address or range, we need -6
        # 2. If we're link-local, we need to specify the interface

        options = base_flags
        if ipaddr.version == 6:
            options = "-6 " + options
        if ipaddr.is_link_local:
            options = "-e " + interface + " " + options

        return options
snort_traffic.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def from_dict(self, msg_dict):
        '''Converts the entry back to dict format'''
        self.timestamp = msg_dict['timestamp']
        self.proto = ndr.PortProtocols(msg_dict['proto'])
        self.src = ipaddress.ip_address(msg_dict['src'])

        if msg_dict['srcport'] is not None:
            self.srcport = int(msg_dict['srcport'])

        self.dst = ipaddress.ip_address(msg_dict['dst'])

        if msg_dict['dstport'] is not None:
            self.dstport = int(msg_dict['dstport'])

        self.ethsrc = msg_dict['ethsrc']
        self.ethdst = msg_dict['ethdst']
        self.ethlen = int(msg_dict['ethlen'])
        self.tcpflags = msg_dict['tcpflags']

        if 'tcpseq' is not None:
            self.tcpseq = msg_dict['tcpseq']
        else:
            self.tcpseq = None
upnpsock.py 文件源码 项目:raiden 作者: raiden-network 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def valid_mappable_ipv4(address):
    try:
        address_uni = unicode(address, errors='ignore')
    except TypeError:
        address_uni = address

    if address_uni in NON_MAPPABLE:
        return False

    try:
        parsed = ipaddress.ip_address(address_uni)
    except ValueError:
        log.debug('invalid IPv4 address', input=address)
        return False
    if parsed is not None and parsed.version == 4:
        return True
    else:
        return False
test_ippool.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        network = u'192.168.1.0/30'
        self.node = self.fixtures.node()
        IpAddrPool().create(
            {'network': network, 'autoblock': '192.168.1.1', 'node':
                self.node.id})
        self.ippool = IPPool.query.get(network)
        self.pod = self.fixtures.pod(owner_id=self.user.id)
        self.pod_ip = PodIP(
            pod_id=self.pod.id,
            network=self.ippool.network,
            ip_address=int(ipaddress.ip_address(u'192.168.1.2')),
        )
        self.db.session.add(self.pod_ip)
        self.db.session.add(self.node)
        self.db.session.commit()

        self.stubs = K8SAPIStubs()
        self.stubs.node_info_in_k8s_api(self.node.hostname)
        self.stubs.node_info_update_in_k8s_api(self.node.hostname)
test_podcollection.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        from kubedock.pods.models import PodIP, IPPool
        self.ip = ipaddress.ip_address(u'192.168.43.4')
        self.with_ip_conf = {
            'public_ip': unicode(self.ip),
            'containers': [
                {'ports': [{'isPublic': True}, {}, {'isPublic': False}]},
                {'ports': [{'isPublic': True}, {}, {'isPublic': False}]},
            ],
        }
        self.without_ip_conf = {
            'public_ip_before_freed': unicode(self.ip),
            'containers': [
                {'ports': [{'isPublic_before_freed': True},
                           {'isPublic_before_freed': None},
                           {'isPublic_before_freed': False}]},
                {'ports': [{'isPublic_before_freed': True},
                           {'isPublic_before_freed': None},
                           {'isPublic_before_freed': False}]},
            ],
        }
        self.pod = self.fixtures.pod(config=json.dumps(self.with_ip_conf))
        self.ippool = IPPool(network='192.168.43.0/29').save()
        self.podip = PodIP(pod_id=self.pod.id, network=self.ippool.network,
                           ip_address=int(self.ip)).save()
models.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def free_hosts_and_busy(self, as_int=None, page=None):
        pods = PodIP.filter_by(network=self.network)
        allocated_ips = {int(pod): pod.get_pod() for pod in pods}
        blocked_ips = self.get_blocked_set(as_int=True)
        hosts = self.hosts(as_int=True, page=page)

        def get_ip_state(ip, pod):
            if pod:
                return 'busy'
            if ip in blocked_ips:
                return 'blocked'
            return 'free'

        def get_ip_info(ip):
            pod = allocated_ips.get(ip)
            state = get_ip_state(ip, pod)
            return ip if as_int else str(ipaddress.ip_address(ip)), pod, state

        return [get_ip_info(ip) for ip in hosts]
models.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_free_host(cls, as_int=None, node=None, ip=None):
        """Return free host if available.
        :param as_int: return ip as long int
        :param node: if set, get free host only for this node
        :param ip: if set, first try to check if this ip available, if not,
        then return any other available ip
        :return: ip address, as str or int, depends on  as_int value

        """
        if ip:
            network = cls.get_network_by_ip(ip)
            if network and network.is_ip_available(ip, node):
                return int(ipaddress.ip_address(ip)) if as_int else ip
        if node is None:
            networks = cls.all()
        else:
            networks = cls.filter(cls.node.has(hostname=node))
        for n in networks:
            free_host = n.get_first_free_host(as_int=as_int)
            if free_host is not None:
                return free_host
        raise NoFreeIPs()
node_network_plugin.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def init():
    config = read_config_ini('/run/flannel/subnet.env')
    config_network = config['flannel_network']
    config_subnet = config['flannel_subnet']
    _update_ipset('kuberdock_flannel', [config_network], set_type='hash:net')
    _update_ipset('kuberdock_nodes', [])
    _update_ipset('kuberdock_cluster',
                  ['kuberdock_flannel', 'kuberdock_nodes'],
                  set_type='list:set')
    network = ipaddress.ip_network(unicode(config_network))
    subnet = ipaddress.ip_network(unicode(config_subnet), strict=False)
    etcd = ETCD()
    for user in etcd.users():
        for pod in etcd.pods(user):
            pod_ip = ipaddress.ip_address(pod)
            if pod_ip not in network or pod_ip in subnet:
                etcd.delete_user(user, pod)
    update_ipset()
volume.py 文件源码 项目:charm-glusterfs 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_local_bricks(volume: str) -> Result:
    """
        Return all bricks that are being served locally in the volume
        volume: Name of the volume to get local bricks for
    """
    try:
        vol_info = volume_info(volume)
        local_ip = get_local_ip()
        if local_ip.is_err():
            return Err(local_ip.value)
        local_brick_list = []
        for vol in vol_info:
            for brick in vol.bricks:
                if ip_address(brick.peer.hostname) == local_ip.value:
                    local_brick_list.append(brick)
        return Ok(local_brick_list)
    except GlusterCmdOutputParseError:
        raise


问题


面经


文章

微信
公众号

扫码关注公众号