def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
python类IPv4Address()的实例源码
def parse_ip4(pack):
arr = struct_ip4.unpack(pack[:20])
hdr = {
"version" : arr[0] >> 4, # should be 4 ...
"ihl" : arr[0] & 0xF,
"tos" : arr[1],
"tot_len" : arr[2],
"id" : arr[3],
"frag_off" : arr[4],
"ttl" : arr[5],
"protcol" : arr[6],
"check" : arr[7],
"saddr" : ipaddress.IPv4Address(arr[8]),
"daddr" : ipaddress.IPv4Address(arr[9])
}
hdr_len = hdr["ihl"] * 4
return hdr, pack[hdr_len:]
def clean(self):
# delete existing entry if exists
machine = MachineData.objects(ip_address=self.ip_address)
if machine:
machine.delete()
# add ip_address_decimal, aws, ec2 fields depending on the ip_address
if self.ip_address_decimal is None:
self.ip_address_decimal = int(ipaddress.IPv4Address(self.ip_address))
if self.aws is None:
self.aws = Validation.is_aws(self.ip_address)
if self.aws:
if self.status == 'OK':
self.ec2 = {
'instance_id': EC2Client.ip_instance_map.get(self.ip_address, EC2Client.get_instance_id(self.ip_address)),
'state': "running"
}
else:
self.ec2 = {
'instance_id': EC2Client.ip_instance_map.get(self.ip_address, EC2Client.get_instance_id(self.ip_address)),
'state': None
}
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
trace_req_header_values = TRACEREQHEADER()
trace_req_header_values.oam_type = oam_type
trace_req_header_values.sil = sil
trace_req_header_values.port = int(remote_port)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((remote_ip, trace_req_header_values.port))
# print(s.getsockname()[0])
src_addr = ipaddress.ip_address(s.getsockname()[0])
if src_addr.version == 4:
trace_req_header_values.ip_1 = 0x00000000
trace_req_header_values.ip_2 = 0x00000000
trace_req_header_values.ip_3 = 0x0000FFFF
trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
elif src_addr.version == 6:
int_addr6 = int(ipaddress.IPv6Address(src_addr))
trace_req_header_values.ip_1 = int_addr6 >> 96
trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF
return trace_req_header_values
def process_context_headers(ctx1=0, ctx2=0, ctx3=0, ctx4=0):
"""
Encode context header values in NSH header. The function
is smart enough that is one of the values is an IP address
it will properly convert to a integer and encode it properly
:param ctx1: NSH context header 1
:param ctx2: NSH context header 2
:param ctx3: NSH context header 3
:param ctx4: NSH context header 4
:return: Array of encoded headers
"""
context_headers = []
for ctx in [ctx1, ctx2, ctx3, ctx4]:
try:
ipaddr = ipaddress.IPv4Address(ctx)
context_headers.append(int(ipaddr))
except ValueError:
try:
context_headers.append((int(ctx) & 0xFFFFFFFF))
except ValueError:
logger.error("Context header %d can not be represented as an integer", ctx)
return context_headers
def get_addresses(hostname) -> List[Union[IPv4Address, IPv6Address]]:
# Get DNS info
try:
a_records = subprocess.check_output(args=['dig', '+short', 'a', hostname],
stderr=subprocess.DEVNULL)
aaaa_records = subprocess.check_output(args=['dig', '+short', 'aaaa', hostname],
stderr=subprocess.DEVNULL)
dns_records = a_records + aaaa_records
dns_results = []
for line in dns_records.decode('utf-8').strip().split():
try:
dns_results.append(str(ip_address(line)))
except ValueError:
pass
except subprocess.CalledProcessError:
dns_results = []
return dns_results
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def _nodes(self, container_base_name: str) -> Set[Node]:
"""
Args:
container_base_name: The start of the container names.
Returns: ``Node``s corresponding to containers with names starting
with ``container_base_name``.
"""
client = docker.from_env(version='auto')
filters = {'name': container_base_name}
containers = client.containers.list(filters=filters)
return set(
Node(
ip_address=IPv4Address(
container.attrs['NetworkSettings']['IPAddress']
),
ssh_key_path=self._path / 'include' / 'ssh' / 'id_rsa',
) for container in containers
)
def is_ipv4(string):
"""
Checks if a string is a valid ip-address (v4)
:param string: String to check
:type string: str
:return: True if an ip, false otherwise.
:rtype: bool
"""
try:
ipaddress.IPv4Address(string)
return True
except ipaddress.AddressValueError:
return False
def getSendTo(addressType):
def sendto(s, _data, _to):
ancdata = []
if type(_to) == addressType:
addr = ipaddress.ip_address(_to.getLocalAddress()[0])
else:
addr = ipaddress.ip_address(s.getsockname()[0])
if type(addr) == ipaddress.IPv4Address:
_f = in_pktinfo()
_f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
_f = in6_pktinfo()
_f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
return s.sendmsg([_data], ancdata, 0, _to)
return sendto
def testTeredo(self):
# stolen from wikipedia
server = ipaddress.IPv4Address('65.54.227.120')
client = ipaddress.IPv4Address('192.0.2.45')
teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
self.assertEqual((server, client),
ipaddress.ip_address(teredo_addr).teredo)
bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
# i77
teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
ipaddress.IPv4Address('95.26.244.94')),
teredo_addr.teredo)
def get_less_used_network(num_of_requested_ip, global_variable, config_obj):
"""
Get the network with more IP available.
:return: Network
"""
# identify less used network
while global_variable.free_ip_lock is True:
time.sleep(1)
free_ip = global_variable.free_ips
less_used_network = sorted(free_ip, key=lambda k: len(free_ip[k]), reverse=True)[0]
# network must have enough IPs
if len(global_variable.free_ips[less_used_network]) < num_of_requested_ip + int(
config_section_map(config_obj, 'vcloud')['min_spare_ip']):
return None
ip_list = global_variable.free_ips[less_used_network][:num_of_requested_ip]
pvdc_external_networks = global_variable.vcs.get_network()
from ipaddress import IPv4Address, IPv4Network
external_network = filter(lambda x: IPv4Address(x.gateway) in IPv4Network(less_used_network), pvdc_external_networks)
return {'object': external_network[0],
'iplist': ip_list}
def is_ipv4_address(self, ipaddr):
'''
@summary: Check address is valid IPv4 address.
@param ipaddr IP address to check
@return Boolean
'''
is_valid_ipv4 = True
try :
# building ipaddress fails for some of addresses unless unicode(ipaddr) is specified for both ipv4/ipv6
# Example - 192.168.156.129, it is valid IPV4 address, send_packet works with it.
ip = ipaddress.IPv4Address(unicode(ipaddr))
except Exception, e :
is_valid_ipv4 = False
return is_valid_ipv4
#---------------------------------------------------------------------
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
"""Initialise new IPPrefix instance."""
try:
obj = ipaddress.ip_address(value)
except Exception:
try:
obj = ipaddress.ip_network(value, strict=False)
except Exception:
raise
if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
self.prefix = None
self.type = self.HOST
elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
self.prefix = obj.network_address
self.type = self.PREFIX
self.version = obj.version
self.txt = obj.compressed
def testTeredo(self):
# stolen from wikipedia
server = ipaddress.IPv4Address('65.54.227.120')
client = ipaddress.IPv4Address('192.0.2.45')
teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
self.assertEqual((server, client),
ipaddress.ip_address(teredo_addr).teredo)
bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
# i77
teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
ipaddress.IPv4Address('95.26.244.94')),
teredo_addr.teredo)
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def validate_ip(addr):
"""Pass-through function for validating an IPv4 address.
Args:
ip: (str) IP address
Returns:
unicode string with same address
Raises:
Error: if IPv4 address is not valid
"""
try:
return ipaddress.IPv4Address(unicode(addr)).compressed
except ipaddress.AddressValueError as exc:
raise Error('Invalid IPv4 address "%s"; %s' % (addr, exc))