def exeute(self, method, value):
str_convert_type = (
six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address)
try:
result = getattr(
self.typeclass(value, self.strict_level), method)()
if method == "validate":
result = "NOP [#f1]_"
elif isinstance(result, str_convert_type):
result = '``"{}"``'.format(result)
except TypeError:
result = "E [#f2]_"
except typepy.TypeConversionError:
result = "E [#f3]_"
return result
python类IPv6Address()的实例源码
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def isipv6(value):
"""
Return whether or not given value is an IP version 6.
If the value is an IP version 6, this function returns ``True``, otherwise ``False``.
Examples::
>>> isipv6('2001:41d0:2:a141::1')
True
>>> isipv6('127.0.0.1')
False
:param value: string to validate IP version 6
"""
try:
ip_addr = ipaddress.IPv6Address(value)
except ipaddress.AddressValueError:
return False
return ip_addr.version == 6
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def setUp(self):
self.packet_class = IPv6
self.packet_fixture = bytes.fromhex(
'6efbcdef001811fd200109e0000000000000000000032002200109e0000400320212004500320222'
'123456780018a695') + b'Demo UDP packet!'
self.message_fixture = IPv6(
traffic_class=239,
flow_label=773615,
next_header=17,
hop_limit=253,
source=IPv6Address('2001:9e0::3:2002'),
destination=IPv6Address('2001:9e0:4:32:212:45:32:0222'),
payload=UDP(
source_port=4660,
destination_port=22136,
checksum=42645,
payload=b'Demo UDP packet!'
)
)
self.parse_packet()
def test_l4_payload_type(self):
message = IPv6(
traffic_class=0,
flow_label=0,
next_header=0,
hop_limit=0,
source=IPv6Address('::'),
destination=IPv6Address('::'),
payload=UnknownLayer4Protocol(b'1234')
)
packet = bytes.fromhex('6000000000040000000000000000000000000000'
'0000000000000000000000000000000000000000'
'31323334')
parsed_len, parsed_message = IPv6.parse(packet)
self.assertEqual(parsed_len, len(packet))
self.assertEqual(parsed_message, message)
self.assertEqual(message.save(), packet)
def find_link_address(link_address: Optional[IPv6Address],
interface_addresses: Iterable[IPv6Address]) -> IPv6Address:
"""
Find the appropriate reply-from address
:param link_address: The link-address address specified in the configuration, if any
:param interface_addresses: The list of addresses on the interface
:return: The reply-from address to use
"""
if link_address:
return link_address
else:
# Pick the first global address
global_addresses = [address for address in interface_addresses if is_global_unicast(address)]
if global_addresses:
return global_addresses[0]
else:
return IPv6Address('::')
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
inputtype.py 文件源码
项目:archive-Holmes-Totem-Service-Library
作者: HolmesProcessing
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def ValidateIP(ip):
"""
Check if an IP is public or not.
Public IP is any IP that is neither of:
- private
- loopback
- broadcast / multicast
- link-local
Returns True on success and False on failure.
Raises a ValueError exception if the input is not of type
ipaddress.IPv4Address or ipaddress.IPv6Address.
"""
return __validateIP(ip)
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
trace_req_header_values = TRACEREQHEADER()
trace_req_header_values.oam_type = oam_type
trace_req_header_values.sil = sil
trace_req_header_values.port = int(remote_port)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((remote_ip, trace_req_header_values.port))
# print(s.getsockname()[0])
src_addr = ipaddress.ip_address(s.getsockname()[0])
if src_addr.version == 4:
trace_req_header_values.ip_1 = 0x00000000
trace_req_header_values.ip_2 = 0x00000000
trace_req_header_values.ip_3 = 0x0000FFFF
trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
elif src_addr.version == 6:
int_addr6 = int(ipaddress.IPv6Address(src_addr))
trace_req_header_values.ip_1 = int_addr6 >> 96
trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF
return trace_req_header_values
def getSendTo(addressType):
def sendto(s, _data, _to):
ancdata = []
if type(_to) == addressType:
addr = ipaddress.ip_address(_to.getLocalAddress()[0])
else:
addr = ipaddress.ip_address(s.getsockname()[0])
if type(addr) == ipaddress.IPv4Address:
_f = in_pktinfo()
_f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
_f = in6_pktinfo()
_f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
return s.sendmsg([_data], ancdata, 0, _to)
return sendto
def __init__(self, cmd, atyp, addr, port):
if cmd not in REQ_COMMAND.values():
raise ValueError("Unsupported request command {}".format(cmd))
if atyp not in ADDR_TYPE.values():
raise ValueError("Unsupported address type {}".format(atyp))
if atyp == ADDR_TYPE["IPV4"]:
try:
addr = ipaddress.IPv4Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv4")
elif atyp == ADDR_TYPE["IPV6"]:
try:
addr = ipaddress.IPv6Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv6")
elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
raise ValueError("Domain name expect to be unicode string")
self.cmd = cmd
self.atyp = atyp
self.addr = addr
self.port = port
def __init__(self, status, atyp, addr, port):
if status not in RESP_STATUS.values():
raise ValueError("Unsupported status code {}".format(status))
if atyp not in ADDR_TYPE.values():
raise ValueError("Unsupported address type {}".format(atyp))
if atyp == ADDR_TYPE["IPV4"]:
try:
addr = ipaddress.IPv4Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv4")
elif atyp == ADDR_TYPE["IPV6"]:
try:
addr = ipaddress.IPv6Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv6")
elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
raise ValueError("Domain name expect to be unicode string")
self.status = status
self.atyp = atyp
self.addr = addr
self.port = port
def validate_ip_address(self, address, af=6):
if af == 4:
try:
ipaddress.IPv4Address(address)
return True
except ipaddress.AddressValueError as ex:
logging.error(ex)
elif af == 6:
try:
ipaddress.IPv6Address(address)
return True
except ipaddress.AddressValueError as ex:
logging.error(ex)
else:
raise Exception("Invalid AF: {}".format(af))
return False
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
trace_req_header_values = TRACEREQHEADER()
trace_req_header_values.oam_type = oam_type
trace_req_header_values.sil = sil
trace_req_header_values.port = int(remote_port)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((remote_ip, trace_req_header_values.port))
# print(s.getsockname()[0])
src_addr = ipaddress.ip_address(s.getsockname()[0])
if src_addr.version == 4:
trace_req_header_values.ip_1 = 0x00000000
trace_req_header_values.ip_2 = 0x00000000
trace_req_header_values.ip_3 = 0x0000FFFF
trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
elif src_addr.version == 6:
int_addr6 = int(ipaddress.IPv6Address(src_addr))
trace_req_header_values.ip_1 = int_addr6 >> 96
trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF
return trace_req_header_values
def get_addresses(hostname) -> List[Union[IPv4Address, IPv6Address]]:
# Get DNS info
try:
a_records = subprocess.check_output(args=['dig', '+short', 'a', hostname],
stderr=subprocess.DEVNULL)
aaaa_records = subprocess.check_output(args=['dig', '+short', 'aaaa', hostname],
stderr=subprocess.DEVNULL)
dns_records = a_records + aaaa_records
dns_results = []
for line in dns_records.decode('utf-8').strip().split():
try:
dns_results.append(str(ip_address(line)))
except ValueError:
pass
except subprocess.CalledProcessError:
dns_results = []
return dns_results
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def is_ipv6(string):
"""
Checks if a string is a valid ip-address (v6)
:param string: String to check
:type string: str
:return: True if an ipv6, false otherwise.
:rtype: bool
"""
try:
ipaddress.IPv6Address(string)
return True
except ipaddress.AddressValueError:
return False
def getSendTo(addressType):
def sendto(s, _data, _to):
ancdata = []
if type(_to) == addressType:
addr = ipaddress.ip_address(_to.getLocalAddress()[0])
else:
addr = ipaddress.ip_address(s.getsockname()[0])
if type(addr) == ipaddress.IPv4Address:
_f = in_pktinfo()
_f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
_f = in6_pktinfo()
_f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
return s.sendmsg([_data], ancdata, 0, _to)
return sendto
def test_bad_address_split_v6_repeated_double_colon(self):
def assertBadSplit(addr):
msg = "At most one '::' permitted in %r"
with self.assertAddressError(msg, addr):
ipaddress.IPv6Address(addr)
assertBadSplit("3ffe::1::1")
assertBadSplit("1::2::3::4:5")
assertBadSplit("2001::db:::1")
assertBadSplit("3ffe::1::")
assertBadSplit("::3ffe::1")
assertBadSplit(":3ffe::1::1")
assertBadSplit("3ffe::1::1:")
assertBadSplit(":3ffe::1::1:")
assertBadSplit(":::")
assertBadSplit('2001:db8:::1')
def testTeredo(self):
# stolen from wikipedia
server = ipaddress.IPv4Address('65.54.227.120')
client = ipaddress.IPv4Address('192.0.2.45')
teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
self.assertEqual((server, client),
ipaddress.ip_address(teredo_addr).teredo)
bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
# i77
teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
ipaddress.IPv4Address('95.26.244.94')),
teredo_addr.teredo)
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def handle_ipaddr_insert(self, prefix, prefix_len, _stable, flags, _is_local):
""" Add an ip address for each prefix on prefix change. """
ipaddr_str = str(ipaddress.IPv6Address(prefix)) + \
str(self.wpan_api.nodeid)
if CONFIG.DEBUG_LOG_PROP:
print("\n>>>> new PREFIX add ipaddr: " + ipaddr_str)
valid = 1
preferred = 1
flags = 0
ipaddr = ipaddress.IPv6Interface(unicode(ipaddr_str))
self.autoAddresses.add(ipaddr)
arr = self.encode_fields('6CLLC',
ipaddr.ip.packed,
prefix_len,
valid,
preferred,
flags)
self.wpan_api.prop_insert_async(SPINEL.PROP_IPV6_ADDRESS_TABLE,
arr, str(len(arr)) + 's',
SPINEL.HEADER_EVENT_HANDLER)
def handle_property(self, line, prop_id, mixed_format='B', output=True):
""" Helper to set property when line argument passed, get otherwise. """
value = self.prop_get_or_set_value(prop_id, line, mixed_format)
if not output:
return value
if value is None or value == "":
print("Error")
return None
if line is None or line == "":
# Only print value on PROP_VALUE_GET
if mixed_format == '6':
print(str(ipaddress.IPv6Address(value)))
elif (mixed_format == 'D') or (mixed_format == 'E'):
print(util.hexify_str(value, ''))
elif mixed_format == 'H':
print("%04x" % value)
else:
print(str(value))
print("Done")
return value