def exeute(self, method, value):
str_convert_type = (
six.text_type, ipaddress.IPv4Address, ipaddress.IPv6Address)
try:
result = getattr(
self.typeclass(value, self.strict_level), method)()
if method == "validate":
result = "NOP [#f1]_"
elif isinstance(result, str_convert_type):
result = '``"{}"``'.format(result)
except TypeError:
result = "E [#f2]_"
except typepy.TypeConversionError:
result = "E [#f3]_"
return result
python类IPv4Address()的实例源码
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def isipv4(value):
"""
Return whether or not given value is an IP version 4.
If the value is an IP version 4, this function returns ``True``, otherwise ``False``.
Examples::
>>> isipv4('127.0.0.1')
True
>>> isipv4('::1')
False
:param value: string to validate IP version 4
"""
try:
ip_addr = ipaddress.IPv4Address(value)
except ipaddress.AddressValueError:
return False
return ip_addr.version == 4
def ip(cls, value, raise_exception=False):
""" IP address validation """
if cls.network(value):
value = NetTest.convert.string.cidr(value)
if value.endswith('/32'):
value = value.split('/')[0]
else:
return False
if not isinstance(value, basestring):
if raise_exception:
raise TypeError('Invalid type \'{}\''.format(type(value)))
return False
else:
value = unicode(value)
try:
ipaddress.IPv4Address(value)
except (ValueError, TypeError):
if raise_exception:
raise
return False
return True
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def test_covering_cidr(ips):
"""
covering_cidr() gets the minimal CIDR that covers given IPs.
In particular, that means any subnets should *not* cover all given IPs.
"""
cidr = telepresence.vpn.covering_cidr(ips)
assert isinstance(cidr, str)
cidr = ipaddress.IPv4Network(cidr)
assert cidr.prefixlen <= 24
# All IPs in given CIDR:
ips = [ipaddress.IPv4Address(i) for i in ips]
assert all([ip in cidr for ip in ips])
# Subnets do not contain all IPs if we're not in minimum 24 bit CIDR:
if cidr.prefixlen < 24:
for subnet in cidr.subnets():
assert not all([ip in subnet for ip in ips])
def reinit_data(self):
"""
Subclass update loopback information
"""
self.loips = {}
self.db_conn.connect(mibs.APPL_DB)
loopbacks = self.db_conn.keys(mibs.APPL_DB, "INTF_TABLE:lo:*")
if not loopbacks:
return
# collect only ipv4 interfaces
for loopback in loopbacks:
lostr = loopback.decode()
loip = lostr[len("INTF_TABLE:lo:"):]
ipa = ipaddress.ip_address(loip)
if isinstance(ipa, ipaddress.IPv4Address):
self.loips[loip] = ipa
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def get_free_ip_list(self, net):
# type: (oca.VirtualNetworkPool) -> list[str]
"""
Returns the set of free IP addresses in the given network
:param net: oca.VirtualNetworkPool
:return: a set of IPv4 addresses
"""
ip_list, used_ip_list = set(), set()
for r in net.address_ranges:
start_ip = int(IPv4Address(unicode(r.ip)))
end_ip = start_ip + r.size
for ip in range(start_ip, end_ip):
ip_list.add(str(IPv4Address(ip)))
for lease in r.leases:
used_ip_list.add(lease.ip)
free_ips = list(ip_list - used_ip_list)
random.shuffle(free_ips)
return free_ips
def parse_autoblock(data):
# type: (str) -> set
def _parse(item):
# Try to parse item as a single IP
try:
ipaddress.IPv4Address(item)
return {item}
except ipaddress.AddressValueError:
pass
# Try parse item as ip range: ip1-ip2
try:
first_ip, last_ip = [utils.ip2int(i) for i in item.split('-')]
return {utils.int2ip(n) for n in range(first_ip, last_ip + 1)}
except ValueError:
raise APIError(
'Exclude IP\'s are expected to be in the form of '
'10.0.0.1,10.0.0.4 or 10.1.0.10-10.1.1.54 or both '
'comma-separated')
ip_sets = (_parse(unicode(d)) for d in data.split(','))
return reduce(operator.or_, ip_sets)
inputtype.py 文件源码
项目:archive-Holmes-Totem-Service-Library
作者: HolmesProcessing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def ValidateIP(ip):
"""
Check if an IP is public or not.
Public IP is any IP that is neither of:
- private
- loopback
- broadcast / multicast
- link-local
Returns True on success and False on failure.
Raises a ValueError exception if the input is not of type
ipaddress.IPv4Address or ipaddress.IPv6Address.
"""
return __validateIP(ip)
def generate_packet(src, data):
min_size = ethhdr.min_size + ip4hdr.min_size + udphdr.min_size
packet = bytearray(len(data) + min_size)
eth = ethhdr({'src': src,
'dst': pnet.HWAddress(u'ff:ff:ff:ff:ff:ff'),
'type': pnet.ETH_P_IP},
buf=packet)
ip4 = ip4hdr({'dst': ipaddress.IPv4Address(u'255.255.255.255'),
'proto': socket.IPPROTO_UDP,
'len': len(data) + ip4hdr.min_size + udphdr.min_size},
buf=eth.payload)
udp = udphdr({'sport': 68,
'dport': 67,
'len': len(data) + udphdr.min_size},
buf=ip4.payload)
ip4['csum'] = pnet.checksum(ip4.tobytes())
udp['csum'] = pnet.ipv4_checksum(ip4, udp, data)
udp.payload = data
return packet
def test_socks_request_ipv4(self):
self.layer.socks_conn = Mock()
self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)
addr_future = self.layer.handle_request_and_create_destination(
Request(REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
u"127.0.0.1", self.port))
dest_stream, host, port = yield addr_future
self.assertIsNotNone(self.event)
self.assertIsInstance(self.event, Response)
self.assertEqual(self.event.status, RESP_STATUS["SUCCESS"])
self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])
self.assertIsNotNone(dest_stream)
self.assertFalse(dest_stream.closed())
self.assertEqual(host, IPv4Address(u"127.0.0.1"))
self.assertEqual(port, self.port)
dest_stream.close()
def test_handle_connection_timeout(self):
self.layer.socks_conn = Mock()
self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)
socks_request = Request(
REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
u"1.2.3.4", self.port)
self.layer.create_dest_stream = Mock(
side_effect=self.create_raise_exception_function(TimeoutError))
addr_future = self.layer.handle_request_and_create_destination(
socks_request)
with self.assertRaises(DestNotConnectedError):
yield addr_future
self.assertIsNotNone(self.event)
self.assertIsInstance(self.event, Response)
self.assertEqual(self.event.status, RESP_STATUS["NETWORK_UNREACHABLE"])
self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])
self.assertEqual(self.event.addr, IPv4Address(u"1.2.3.4"))
self.assertEqual(self.event.port, self.port)
def create_endpoint(ipv4, port, service_name):
"""Creates an endpoint object
:param ipv4: ipv4 address of the endpoint
:param port: port of the endpoint
:service_name: human readable name that identifies the service of the endpoint
:returns: zipkin endpoint object
"""
if (ipv4 == ""):
ipv4 = "0.0.0.0"
try:
ipv4 = int(ipaddress.IPv4Address(ipv4))
except:
ipv4 = "0.0.0.0"
port = int(port)
return zipkin_core.Endpoint(
ipv4=ipv4, port=port, service_name=service_name)
def build_trace_req_header(oam_type, sil, remote_ip, remote_port):
trace_req_header_values = TRACEREQHEADER()
trace_req_header_values.oam_type = oam_type
trace_req_header_values.sil = sil
trace_req_header_values.port = int(remote_port)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((remote_ip, trace_req_header_values.port))
# print(s.getsockname()[0])
src_addr = ipaddress.ip_address(s.getsockname()[0])
if src_addr.version == 4:
trace_req_header_values.ip_1 = 0x00000000
trace_req_header_values.ip_2 = 0x00000000
trace_req_header_values.ip_3 = 0x0000FFFF
trace_req_header_values.ip_4 = int(ipaddress.IPv4Address(src_addr))
elif src_addr.version == 6:
int_addr6 = int(ipaddress.IPv6Address(src_addr))
trace_req_header_values.ip_1 = int_addr6 >> 96
trace_req_header_values.ip_2 = (int_addr6 >> 64) & 0x0FFFFFFFF
trace_req_header_values.ip_3 = (int_addr6 >> 32) & 0x0FFFFFFFF
trace_req_header_values.ip_4 = int_addr6 & 0x0FFFFFFFF
return trace_req_header_values
def process_context_headers(ctx1=0, ctx2=0, ctx3=0, ctx4=0):
"""
Encode context header values in NSH header. The function
is smart enough that is one of the values is an IP address
it will properly convert to a integer and encode it properly
:param ctx1: NSH context header 1
:param ctx2: NSH context header 2
:param ctx3: NSH context header 3
:param ctx4: NSH context header 4
:return: Array of encoded headers
"""
context_headers = []
for ctx in [ctx1, ctx2, ctx3, ctx4]:
try:
ipaddr = ipaddress.IPv4Address(ctx)
context_headers.append(int(ipaddr))
except ValueError:
try:
context_headers.append((int(ctx) & 0xFFFFFFFF))
except ValueError:
logger.error("Context header %d can not be represented as an integer", ctx)
return context_headers
def getRecvFrom(addressType):
def recvfrom(s, sz):
_to = None
data, ancdata, msg_flags, _from = s.recvmsg(sz, socket.CMSG_LEN(sz))
for anc in ancdata:
if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO:
addr = in_pktinfo.from_buffer_copy(anc[2])
addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes())
_to = (str(addr), s.getsockname()[1])
elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO:
addr = in6_pktinfo.from_buffer_copy(anc[2])
addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes())
_to = (str(addr), s.getsockname()[1])
return data, addressType(_from).setLocalAddress(_to)
return recvfrom
def getSendTo(addressType):
def sendto(s, _data, _to):
ancdata = []
if type(_to) == addressType:
addr = ipaddress.ip_address(_to.getLocalAddress()[0])
else:
addr = ipaddress.ip_address(s.getsockname()[0])
if type(addr) == ipaddress.IPv4Address:
_f = in_pktinfo()
_f.ipi_spec_dst = in_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IP, socket.IP_PKTINFO, memoryview(_f).tobytes())]
elif s.family == socket.AF_INET6 and type(addr) == ipaddress.IPv6Address:
_f = in6_pktinfo()
_f.ipi6_addr = in6_addr.from_buffer_copy(addr.packed)
ancdata = [(socket.SOL_IPV6, socket.IPV6_PKTINFO, memoryview(_f).tobytes())]
return s.sendmsg([_data], ancdata, 0, _to)
return sendto
def serverList(self):
data = b'\x04' # packet ID
data += bytes([AUTH_NUMGATEWAYS]) # total num game servers available
data += b'\x01' # last game server used
# the following is repeated for each server:
data += bytes([GATEWAY_ID]) # ID of each server (starting at 1)
# gameserver IP, packed in big-endian order
data += ipaddress.IPv4Address(GATEWAY_IP).packed
# gameserver port, little-endian order
data += struct.pack("<I", GATEWAY_PORT)
data += bytes([GATEWAY_AGELIMIT]) # unsure what this is used for
data += bytes([GATEWAY_PVP]) # 1 if GATEWAY_PVP server, otherwise 0
data += struct.pack("<H", GATEWAY_NUMPLAYERS) # current # of players
data += struct.pack("<H", GATEWAY_MAXPLAYERS) # max # of players
data += bytes([GATEWAY_ONLINE]) # 1 if server should be listed, otherwise 0
if (GATEWAY_ONLINE == 1):
data += b'\x04\x00\x00\x00\x00'
else:
# TODO doesn't list server if it isn't a test server
data += b'\x00\x00\x00\x00\x00'
self.sendPacket(data)
def test_recv_in_response(self):
conn = Connection(our_role="client")
conn._conn.machine.set_state("response")
conn._conn._version = 5
conn._conn._addr_type = ADDR_TYPE["IPV4"]
conn._conn._addr = ipaddress.IPv4Address("127.0.0.1")
conn._conn._port = 8080
raw_data = struct.pack("!BBxB4BH", 0x5, 0x0, 0x1, 127, 0, 0, 1, 8080)
event = conn.recv(raw_data)
self.assertEqual(conn._conn.state, "end")
self.assertEqual(event, "Response")
self.assertEqual(event.status, 0)
self.assertEqual(event.atyp, 1)
self.assertEqual(event.addr, ipaddress.IPv4Address("127.0.0.1"))
self.assertEqual(event.port, 8080)
def test_greeting_request_socks4(self):
raw_data = struct.pack("!BBH4B6sB", 0x4, 0x1, 5580, 127, 0, 0, 1, "Johnny".encode("ascii"), 0)
request = read_greeting_request(raw_data)
self.assertIsInstance(request, Socks4Request)
self.assertEqual(request.cmd, 1)
self.assertEqual(request.port, 5580)
self.assertEqual(request.addr, ipaddress.IPv4Address("127.0.0.1"))
self.assertEqual(request.name, "Johnny")
raw_data = struct.pack(
"!BBH4B6sB14sB", 0x4, 0x1, 5580, 0, 0, 0, 1, "Johnny".encode("ascii"), 0, "www.google.com".encode("idna"), 0)
request = read_greeting_request(raw_data)
self.assertIsInstance(request, Socks4Request)
self.assertEqual(request.cmd, 1)
self.assertEqual(request.port, 5580)
self.assertEqual(request.addr, ipaddress.IPv4Address("0.0.0.1"))
self.assertEqual(request.name, "Johnny")
self.assertEqual(request.domainname, "www.google.com")
def __init__(self, cmd, atyp, addr, port):
if cmd not in REQ_COMMAND.values():
raise ValueError("Unsupported request command {}".format(cmd))
if atyp not in ADDR_TYPE.values():
raise ValueError("Unsupported address type {}".format(atyp))
if atyp == ADDR_TYPE["IPV4"]:
try:
addr = ipaddress.IPv4Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv4")
elif atyp == ADDR_TYPE["IPV6"]:
try:
addr = ipaddress.IPv6Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv6")
elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
raise ValueError("Domain name expect to be unicode string")
self.cmd = cmd
self.atyp = atyp
self.addr = addr
self.port = port
def __init__(self, status, atyp, addr, port):
if status not in RESP_STATUS.values():
raise ValueError("Unsupported status code {}".format(status))
if atyp not in ADDR_TYPE.values():
raise ValueError("Unsupported address type {}".format(atyp))
if atyp == ADDR_TYPE["IPV4"]:
try:
addr = ipaddress.IPv4Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv4")
elif atyp == ADDR_TYPE["IPV6"]:
try:
addr = ipaddress.IPv6Address(addr)
except ipaddress.AddressValueError:
raise ValueError("Invalid ipaddress format for IPv6")
elif atyp == ADDR_TYPE["DOMAINNAME"] and not isinstance(addr, string_func):
raise ValueError("Domain name expect to be unicode string")
self.status = status
self.atyp = atyp
self.addr = addr
self.port = port
def validate_ip_address(self, address, af=6):
if af == 4:
try:
ipaddress.IPv4Address(address)
return True
except ipaddress.AddressValueError as ex:
logging.error(ex)
elif af == 6:
try:
ipaddress.IPv6Address(address)
return True
except ipaddress.AddressValueError as ex:
logging.error(ex)
else:
raise Exception("Invalid AF: {}".format(af))
return False
def testTeredo(self):
# stolen from wikipedia
server = ipaddress.IPv4Address('65.54.227.120')
client = ipaddress.IPv4Address('192.0.2.45')
teredo_addr = '2001:0000:4136:e378:8000:63bf:3fff:fdd2'
self.assertEqual((server, client),
ipaddress.ip_address(teredo_addr).teredo)
bad_addr = '2000::4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
bad_addr = '2001:0001:4136:e378:8000:63bf:3fff:fdd2'
self.assertFalse(ipaddress.ip_address(bad_addr).teredo)
# i77
teredo_addr = ipaddress.IPv6Address('2001:0:5ef5:79fd:0:59d:a0e5:ba1')
self.assertEqual((ipaddress.IPv4Address('94.245.121.253'),
ipaddress.IPv4Address('95.26.244.94')),
teredo_addr.teredo)