def __init__(self, node, platform='', cpus=0, memory=0, disk=0):
if node.find('*') < 0:
try:
info = socket.getaddrinfo(node, None)[0]
ip_addr = info[4][0]
if info[0] == socket.AF_INET6:
ip_addr = re.sub(r'^0*', '', ip_addr)
ip_addr = re.sub(r':0*', ':', ip_addr)
ip_addr = re.sub(r'::+', '::', ip_addr)
node = ip_addr
except:
node = ''
if node:
self.ip_rex = node.replace('.', '\\.').replace('*', '.*')
else:
logger.warning('node "%s" is invalid', node)
self.ip_rex = ''
self.platform = platform.lower()
self.cpus = cpus
self.memory = memory
self.disk = disk
python类AF_INET6的实例源码
def __init__(self, address):
self.log = logging.getLogger(self.server_logger)
self.socket = None
if ":" in address[0]:
self.address_family = socket.AF_INET6
else:
self.address_family = socket.AF_INET
self.log.debug("Listening on %s", address)
super(_SpoonMixIn, self).__init__(address, self.handler_klass,
bind_and_activate=False)
self.load_config()
self._setup_socket()
# Finally, set signals
if self.signal_reload is not None:
signal.signal(self.signal_reload, self.reload_handler)
if self.signal_shutdown is not None:
signal.signal(self.signal_shutdown, self.shutdown_handler)
def api_has_bwctl(host, timeout=5, bind=None):
"""
Determine if a host is running the BWCTL daemon
"""
# Null implies localhost
if host is None:
host = "localhost"
# HACK: BWTCLBC
# If the environment says to bind to a certain address, do it.
if bind is None:
bind = os.environ.get('PSCHEDULER_LEAD_BIND_HACK', None)
for family in [socket.AF_INET, socket.AF_INET6]:
try:
with closing(socket.socket(family, socket.SOCK_STREAM)) as sock:
if bind is not None:
sock.bind((bind, 0))
sock.settimeout(timeout)
return sock.connect_ex((host, 4823)) == 0
except socket.error:
pass
return False
def source_interface(address, port=80, ip_version=None):
"""Figure out what local interface is being used to
reach an address.
Returns a tuple of (address, interface_name)
"""
sock = socket.socket(
socket.AF_INET6 if ip_version == 6 else socket.AF_INET,
socket.SOCK_DGRAM)
try:
sock.connect((address, port))
except socket.error:
return (None, None)
interface_address = sock.getsockname()[0]
sock.close()
interface_name = address_interface(interface_address, ip_version=ip_version)
if interface_name:
return (interface_address, interface_name)
return (None, None)
def inet_pton(address_family, ip_string):
if address_family == socket.AF_INET:
return socket.inet_aton(ip_string)
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def _socket_bind_addr(self, sock, af):
bind_addr = ''
if self._bind and af == socket.AF_INET:
bind_addr = self._bind
elif self._bindv6 and af == socket.AF_INET6:
bind_addr = self._bindv6
else:
bind_addr = self._accept_address[0]
bind_addr = bind_addr.replace("::ffff:", "")
if bind_addr in self._ignore_bind_list:
bind_addr = None
if bind_addr:
local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_STREAM, socket.SOL_TCP)
if local_addrs[0][0] == af:
logging.debug("bind %s" % (bind_addr,))
try:
sock.bind((bind_addr, 0))
except Exception as e:
logging.warn("bind %s fail" % (bind_addr,))
def _socket_bind_addr(self, sock, af):
bind_addr = ''
if self._bind and af == socket.AF_INET:
bind_addr = self._bind
elif self._bindv6 and af == socket.AF_INET6:
bind_addr = self._bindv6
bind_addr = bind_addr.replace("::ffff:", "")
if bind_addr in self._ignore_bind_list:
bind_addr = None
if bind_addr:
local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_DGRAM, socket.SOL_UDP)
if local_addrs[0][0] == af:
logging.debug("bind %s" % (bind_addr,))
try:
sock.bind((bind_addr, 0))
except Exception as e:
logging.warn("bind %s fail" % (bind_addr,))
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_ipv6(self):
try:
[sock] = bind_sockets(None, '::1', family=socket.AF_INET6)
port = sock.getsockname()[1]
self.http_server.add_socket(sock)
except socket.gaierror as e:
if e.args[0] == socket.EAI_ADDRFAMILY:
# python supports ipv6, but it's not configured on the network
# interface, so skip this test.
return
raise
url = '%s://[::1]:%d/hello' % (self.get_protocol(), port)
# ipv6 is currently enabled by default but can be disabled
self.http_client.fetch(url, self.stop, allow_ipv6=False)
response = self.wait()
self.assertEqual(response.code, 599)
self.http_client.fetch(url, self.stop)
response = self.wait()
self.assertEqual(response.body, b"Hello world!")
def __init__(self, stream, address, protocol):
self.address = address
# Save the socket's address family now so we know how to
# interpret self.address even after the stream is closed
# and its socket attribute replaced with None.
if stream.socket is not None:
self.address_family = stream.socket.family
else:
self.address_family = None
# In HTTPServerRequest we want an IP, not a full socket address.
if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
address is not None):
self.remote_ip = address[0]
else:
# Unix (or other) socket; fake the remote address.
self.remote_ip = '0.0.0.0'
if protocol:
self.protocol = protocol
elif isinstance(stream, iostream.SSLIOStream):
self.protocol = "https"
else:
self.protocol = "http"
self._orig_remote_ip = self.remote_ip
self._orig_protocol = self.protocol
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def test_ipv6(self):
try:
[sock] = bind_sockets(None, '::1', family=socket.AF_INET6)
port = sock.getsockname()[1]
self.http_server.add_socket(sock)
except socket.gaierror as e:
if e.args[0] == socket.EAI_ADDRFAMILY:
# python supports ipv6, but it's not configured on the network
# interface, so skip this test.
return
raise
url = '%s://[::1]:%d/hello' % (self.get_protocol(), port)
# ipv6 is currently enabled by default but can be disabled
self.http_client.fetch(url, self.stop, allow_ipv6=False)
response = self.wait()
self.assertEqual(response.code, 599)
self.http_client.fetch(url, self.stop)
response = self.wait()
self.assertEqual(response.body, b"Hello world!")
def __init__(self, stream, address, protocol):
self.address = address
# Save the socket's address family now so we know how to
# interpret self.address even after the stream is closed
# and its socket attribute replaced with None.
if stream.socket is not None:
self.address_family = stream.socket.family
else:
self.address_family = None
# In HTTPServerRequest we want an IP, not a full socket address.
if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
address is not None):
self.remote_ip = address[0]
else:
# Unix (or other) socket; fake the remote address.
self.remote_ip = '0.0.0.0'
if protocol:
self.protocol = protocol
elif isinstance(stream, iostream.SSLIOStream):
self.protocol = "https"
else:
self.protocol = "http"
self._orig_remote_ip = self.remote_ip
self._orig_protocol = self.protocol
simple_httpclient_test.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_ipv6(self):
try:
[sock] = bind_sockets(None, '::1', family=socket.AF_INET6)
port = sock.getsockname()[1]
self.http_server.add_socket(sock)
except socket.gaierror as e:
if e.args[0] == socket.EAI_ADDRFAMILY:
# python supports ipv6, but it's not configured on the network
# interface, so skip this test.
return
raise
url = '%s://[::1]:%d/hello' % (self.get_protocol(), port)
# ipv6 is currently enabled by default but can be disabled
self.http_client.fetch(url, self.stop, allow_ipv6=False)
response = self.wait()
self.assertEqual(response.code, 599)
self.http_client.fetch(url, self.stop)
response = self.wait()
self.assertEqual(response.body, b"Hello world!")
def __init__(self, stream, address, protocol):
self.address = address
# Save the socket's address family now so we know how to
# interpret self.address even after the stream is closed
# and its socket attribute replaced with None.
if stream.socket is not None:
self.address_family = stream.socket.family
else:
self.address_family = None
# In HTTPServerRequest we want an IP, not a full socket address.
if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
address is not None):
self.remote_ip = address[0]
else:
# Unix (or other) socket; fake the remote address.
self.remote_ip = '0.0.0.0'
if protocol:
self.protocol = protocol
elif isinstance(stream, iostream.SSLIOStream):
self.protocol = "https"
else:
self.protocol = "http"
self._orig_remote_ip = self.remote_ip
self._orig_protocol = self.protocol
def postprocess(self, name, val):
if name == 'GetSettings':
if 'ssid' in val.get('802-11-wireless', {}):
val['802-11-wireless']['ssid'] = fixups.ssid_to_python(val['802-11-wireless']['ssid'])
for key in val:
val_ = val[key]
if 'mac-address' in val_:
val_['mac-address'] = fixups.mac_to_python(val_['mac-address'])
if 'cloned-mac-address' in val_:
val_['cloned-mac-address'] = fixups.mac_to_python(val_['cloned-mac-address'])
if 'bssid' in val_:
val_['bssid'] = fixups.mac_to_python(val_['bssid'])
if 'ipv4' in val:
val['ipv4']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET) for addr in val['ipv4']['addresses']]
val['ipv4']['routes'] = [fixups.route_to_python(route, socket.AF_INET) for route in val['ipv4']['routes']]
val['ipv4']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET) for addr in val['ipv4']['dns']]
if 'ipv6' in val:
val['ipv6']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['addresses']]
val['ipv6']['routes'] = [fixups.route_to_python(route, socket.AF_INET6) for route in val['ipv6']['routes']]
val['ipv6']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['dns']]
return val
def request(self, check_config):
domain = check_config.get('domain')
if check_config.get('ip_version') == 6:
af = socket.AF_INET6
elif check_config.get('ip_version') == 4:
af = socket.AF_INET
else:
af = 0
try:
result = socket.getaddrinfo(domain, 0, af)
except Exception as e:
return Plugin.STATUS_CRIT, '{}'.format(e)
ips = [ip[4][0] for ip in result]
return (
Plugin.STATUS_OK,
'Domain was resolved with {}'.format(', '.join(ips))
)
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def neighsol(addr, src, iface, timeout=1, chainCC=0):
"""
Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
of the neighbor with specified IPv6 address addr. 'src' address is
used as source of the message. Message is sent on iface. By default,
timeout waiting for an answer is 1 second.
If no answer is gathered, None is returned. Else, the answer is
returned (ethernet frame).
"""
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
d = inet_ntop(socket.AF_INET6, nsma)
dm = in6_getnsmac(nsma)
p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
p /= ICMPv6ND_NS(tgt=addr)
p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
chainCC=chainCC)
return res
def getfield(self, pkt, s):
c = l = None
if self.length_from is not None:
l = self.length_from(pkt)
elif self.count_from is not None:
c = self.count_from(pkt)
lst = []
ret = ""
remain = s
if l is not None:
remain,ret = s[:l],s[l:]
while remain:
if c is not None:
if c <= 0:
break
c -= 1
addr = inet_ntop(socket.AF_INET6, remain[:16])
lst.append(addr)
remain = remain[16:]
return remain+ret,lst
def i2m(self, pkt, x):
l = pkt.len
if x is None:
x = "::"
if l is None:
l = 1
x = inet_pton(socket.AF_INET6, x)
if l is None:
return x
if l in [0, 1]:
return ""
if l in [2, 3]:
return x[:8*(l-1)]
return x + '\x00'*8*(l-3)
def h2i(self, pkt, x):
if x is tuple and type(x[0]) is int:
return x
val = None
try: # Try IPv6
inet_pton(socket.AF_INET6, x)
val = (0, x)
except:
try: # Try IPv4
inet_pton(socket.AF_INET, x)
val = (2, x)
except: # Try DNS
if x is None:
x = ""
x = names2dnsrepr(x)
val = (1, x)
return val
def _resolve_one(self, ip):
"""
overloaded version to provide a Whois resolution on the
embedded IPv4 address if the address is 6to4 or Teredo.
Otherwise, the native IPv6 address is passed.
"""
if in6_isaddr6to4(ip): # for 6to4, use embedded @
tmp = inet_pton(socket.AF_INET6, ip)
addr = inet_ntop(socket.AF_INET, tmp[2:6])
elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
addr = teredoAddrExtractInfo(ip)[2]
else:
addr = ip
_, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
return ip,asn,desc
def ifchange(self, iff, addr):
the_addr, the_plen = (addr.split("/")+["128"])[:2]
the_plen = int(the_plen)
naddr = inet_pton(socket.AF_INET6, the_addr)
nmask = in6_cidr2mask(the_plen)
the_net = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))
for i in range(len(self.routes)):
net,plen,gw,iface,addr = self.routes[i]
if iface != iff:
continue
if gw == '::':
self.routes[i] = (the_net,the_plen,gw,iface,the_addr)
else:
self.routes[i] = (net,the_plen,gw,iface,the_addr)
self.invalidate_cache()
ip6_neigh_cache.flush()
def ifadd(self, iff, addr):
"""
Add an interface 'iff' with provided address into routing table.
Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into
Scapy6 internal routing table:
Destination Next Hop iface Def src @
2001:bd8:cafe:1::/64 :: eth0 2001:bd8:cafe:1::1
prefix length value can be omitted. In that case, a value of 128
will be used.
"""
addr, plen = (addr.split("/")+["128"])[:2]
addr = in6_ptop(addr)
plen = int(plen)
naddr = inet_pton(socket.AF_INET6, addr)
nmask = in6_cidr2mask(plen)
prefix = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))
self.invalidate_cache()
self.routes.append((prefix,plen,'::',iff,[addr]))
def inet_pton(family, text):
"""Convert the textual form of a network address into its binary form.
@param family: the address family
@type family: int
@param text: the textual address
@type text: string
@raises NotImplementedError: the address family specified is not
implemented.
@rtype: string
"""
if family == AF_INET:
return dns.ipv4.inet_aton(text)
elif family == AF_INET6:
return dns.ipv6.inet_aton(text)
else:
raise NotImplementedError
def inet_ntop(family, address):
"""Convert the binary form of a network address into its textual form.
@param family: the address family
@type family: int
@param address: the binary address
@type address: string
@raises NotImplementedError: the address family specified is not
implemented.
@rtype: string
"""
if family == AF_INET:
return dns.ipv4.inet_ntoa(address)
elif family == AF_INET6:
return dns.ipv6.inet_ntoa(address)
else:
raise NotImplementedError
def af_for_address(text):
"""Determine the address family of a textual-form network address.
@param text: the textual address
@type text: string
@raises ValueError: the address family cannot be determined from the input.
@rtype: int
"""
try:
dns.ipv4.inet_aton(text)
return AF_INET
except:
try:
dns.ipv6.inet_aton(text)
return AF_INET6
except:
raise ValueError
def _gethostbyaddr(ip):
try:
dns.ipv6.inet_aton(ip)
sockaddr = (ip, 80, 0, 0)
family = socket.AF_INET6
except:
sockaddr = (ip, 80)
family = socket.AF_INET
(name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
aliases = []
addresses = []
tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP,
socket.AI_CANONNAME)
canonical = tuples[0][3]
for item in tuples:
addresses.append(item[4][0])
# XXX we just ignore aliases
return (canonical, aliases, addresses)
def testNToP(self):
from twisted.python.compat import inet_ntop
f = lambda a: inet_ntop(socket.AF_INET6, a)
g = lambda a: inet_ntop(socket.AF_INET, a)
self.assertEquals('::', f('\x00' * 16))
self.assertEquals('::1', f('\x00' * 15 + '\x01'))
self.assertEquals(
'aef:b01:506:1001:ffff:9997:55:170',
f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70'))
self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
self.assertEquals('100::', f('\x01' + '\x00' * 15))
self.assertEquals('100::1', f('\x01' + '\x00' * 14 + '\x01'))