def __init__(self, value):
"""Initialise new IPPrefix instance."""
try:
obj = ipaddress.ip_address(value)
except Exception:
try:
obj = ipaddress.ip_network(value, strict=False)
except Exception:
raise
if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
self.prefix = None
self.type = self.HOST
elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
self.prefix = obj.network_address
self.type = self.PREFIX
self.version = obj.version
self.txt = obj.compressed
python类IPv6Address()的实例源码
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def getSendTo(addressType):
def sendto(s, _data, _to):
ancdata = []
if type(_to) == addressType:
addr = ipaddress.ip_address(_to.getLocalAddress()[0])
else:
addr = ipaddress.ip_address(s.getsockname()[0])
if type(addr) == ipaddress.IPv4Address:
_f = in_pktinfo()
_f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
_f = in6_pktinfo()
_f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
return s.sendmsg([_data], ancdata, 0, _to)
return sendto
def _is_ipaddress(value):
import ipaddress
return isinstance(
value, (ipaddress.IPv4Address, ipaddress.IPv6Address))
def is_ipv6(ip_addr):
try:
ipaddress.IPv6Address(ip_addr)
return True
except ipaddress.AddressValueError:
return False
def unpack_ip_address_bytes(data_bytes, addr_type):
'''
Return a tuple containing the unpacked addr_type IP address string, and the
remainder of data_bytes.
addr_type must be either IPV4_ADDRESS_TYPE or IPV6_ADDRESS_TYPE.
data_bytes must be at least IPV4_ADDRESS_LEN or IPV6_ADDRESS_LEN long.
'''
addr_len = get_addr_type_len(addr_type)
assert len(data_bytes) >= addr_len
if addr_type == IPV4_ADDRESS_TYPE:
assert addr_len == IPV4_ADDRESS_LEN
(addr_bytes, remaining_bytes) = split_field(addr_len, data_bytes)
# Some ipaddress variants demand bytearray, others demand bytes
try:
addr_value = ipaddress.IPv4Address(bytearray(addr_bytes))
return (str(addr_value), remaining_bytes)
except ipaddress.AddressValueError:
pass
except UnicodeDecodeError:
pass
addr_value = ipaddress.IPv4Address(bytes(addr_bytes))
elif addr_type == IPV6_ADDRESS_TYPE:
assert addr_len == IPV6_ADDRESS_LEN
(addr_bytes, remaining_bytes) = split_field(addr_len, data_bytes)
try:
addr_value = ipaddress.IPv6Address(bytearray(addr_bytes))
return (str(addr_value), remaining_bytes)
except ipaddress.AddressValueError:
pass
except UnicodeDecodeError:
pass
addr_value = ipaddress.IPv6Address(bytes(addr_bytes))
else:
raise ValueError('Unexpected address type: {}'.format(addr_type))
return (str(addr_value), remaining_bytes)
def get_bgp_table(host):
"""Return the contents of the cbgpPeer2Table table."""
obj = ObjectIdentity('CISCO-BGP4-MIB', 'cbgpPeer2Table')
target = UdpTransportTarget((host, SNMP['port']))
results = defaultdict(dict)
for (errorIndication, errorStatus,
errorIndex, varBinds) in bulkCmd(snmp, usm, target, context, 0, 25,
ObjectType(obj),
lexicographicMode=False,
lookupMib=True):
if errorIndication:
raise RuntimeError(errorIndication)
elif errorStatus:
raise RuntimeError('%s at %s'
% (errorStatus.prettyPrint(),
errorIndex
and varBinds[-1][int(errorIndex)-1] or '?'))
else:
for (key, val) in varBinds:
if not isinstance(val, EndOfMibView):
(mib, name, index) = \
key.loadMibs('BGP4-MIB').getMibSymbol()
a = index[1].prettyPrint()
results[a]['cbgpPeer2Type'] = index[0]
results[a][name] = val
for address in results:
if results[address]['cbgpPeer2Type'] == 1:
results[address]['cbgpPeer2RemoteAddr'] = \
ipaddress.IPv4Address(int(address, 16))
elif results[address]['cbgpPeer2Type'] == 2:
results[address]['cbgpPeer2RemoteAddr'] = \
ipaddress.IPv6Address(int(address, 16))
return results
def test_checksum_calculation(self):
dummy_ipv6 = IPv6(
source=IPv6Address('2001:9e0::3:2002'),
destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
)
self.assertEqual(self.message_fixture.checksum, self.message.calculate_checksum(dummy_ipv6))
def test_save_with_checksum_calculation(self):
dummy_ipv6 = IPv6(
source=IPv6Address('2001:9e0::3:2002'),
destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
)
self.message.checksum = -1
saved = self.message.save(recalculate_checksum_for=dummy_ipv6)
self.assertEqual(saved, self.packet_fixture)
def test_validate_source(self):
self.message.source = bytes.fromhex('20010db8000000000000000000000001')
with self.assertRaisesRegex(ValueError, 'Source .* IPv6 address'):
self.message.validate()
self.message.source = IPv6Address('ff02::1')
with self.assertRaisesRegex(ValueError, 'Source .* non-multicast IPv6 address'):
self.message.validate()
def setUp(self):
self.packet_class = Ethernet
self.packet_fixture = bytes.fromhex(
'0123456789abcdef0123456786dd'
'6efbcdef001811fd200109e0000000000000000000032002200109e0000400320212004500320222'
'123456780018a695') + b'Demo UDP packet!'
self.message_fixture = Ethernet(
destination=bytes.fromhex('0123456789ab'),
source=bytes.fromhex('cdef01234567'),
ethertype=int('86dd', 16),
payload=IPv6(
traffic_class=239,
flow_label=773615,
next_header=17,
hop_limit=253,
source=IPv6Address('2001:9e0::3:2002'),
destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
payload=UDP(
source_port=4660,
destination_port=22136,
checksum=42645,
payload=b'Demo UDP packet!'
)
)
)
self.parse_packet()
def __init__(self, traffic_class: int = 0, flow_label: int = 0, next_header: int = 0, hop_limit: int = 0,
source: IPv6Address = None, destination: IPv6Address = None, payload: ProtocolElement = None):
super().__init__()
self.traffic_class = traffic_class
self.flow_label = flow_label
self.next_header = next_header
self.hop_limit = hop_limit
self.source = source
self.destination = destination
self.payload = payload
def validate(self):
"""
Validate that the contents of this object conform to protocol specs.
"""
# Check if the traffic class fits in 8 bits
if not isinstance(self.traffic_class, int) or not (0 <= self.traffic_class < 2 ** 8):
raise ValueError("Traffic class must be an unsigned 8 bit integer")
# Check if the flow label fits in 20 bits
if not isinstance(self.flow_label, int) or not (0 <= self.flow_label < 2 ** 20):
raise ValueError("Flow label must be an unsigned 20 bit integer")
# Check if the next header fits in 8 bits
if not isinstance(self.next_header, int) or not (0 <= self.next_header < 2 ** 8):
raise ValueError("Next header type must be an unsigned 8 bit integer")
# Check if the hop limit fits in 8 bits
if not isinstance(self.hop_limit, int) or not (0 <= self.hop_limit < 2 ** 8):
raise ValueError("Hop limit must be an unsigned 8 bit integer")
# Check if the source and destination are IPv6 addresses
if not isinstance(self.source, IPv6Address):
raise ValueError("Source must be an IPv6 address")
if self.source.is_multicast:
raise ValueError("Source must be a non-multicast IPv6 address")
if not isinstance(self.destination, IPv6Address):
raise ValueError("Destination must be an IPv6 address")
# Check if the payload is a protocol element
if not isinstance(self.payload, ProtocolElement):
raise ValueError("Payload must be a protocol element")
# Check if all options are allowed
self.validate_contains([self.payload])
self.payload.validate()
def __init__(self, interface_index: int,
interface_mac_address: bytes, client_mac_address: bytes,
reply_from: IPv6Address, reply_socket: socket.socket):
self.interface_index = interface_index
self.interface_mac_address = interface_mac_address
self.client_mac_address = client_mac_address
self.reply_from = reply_from
self.reply_socket = reply_socket
def find_reply_from(reply_from: Optional[IPv6Address], interface_name: str,
interface_addresses: Iterable[IPv6Address]) -> IPv6Address:
"""
Find the appropriate reply-from address
:param reply_from: The reply-from address specified in the configuration, if any
:param interface_name: The name of the interface for logging purposes
:param interface_addresses: The list of addresses on the interface
:return: The reply-from address to use
"""
if reply_from:
# Check if this address exists
if reply_from not in interface_addresses:
raise ValueError("Reply-from address {addr} does not exist on {intf}".format(
addr=reply_from,
intf=interface_name
))
return reply_from
else:
# Pick the first link-local address
ll_addresses = [address for address in interface_addresses if address.is_link_local]
if not ll_addresses:
raise ValueError("No link-local address found on {intf}".format(
intf=interface_name
))
return ll_addresses[0]
def set_domain_name_for_ipv6_subnet(sender, instance, **kwargs):
if len(instance.domain_name) > 0:
return
network = ipaddress.IPv6Address("%s::" % instance.network)
instance.domain_name = ".".join(network.exploded.replace(":", "")[:16])[::-1] + ".ip6.arpa"
inputtypesTest.py 文件源码
项目:archive-Holmes-Totem-Service-Library
作者: HolmesProcessing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def makeIP(i, size):
if size == 4:
return ipaddress.IPv4Address(i)
else:
return ipaddress.IPv6Address(i)