def __init__(self, *args, **kwargs):
ipv6 = kwargs.pop("ipv6", False)
if ipv6:
self.address_family = socket.AF_INET6
kwargs["bind_and_activate"] = False
else:
self.address_family = socket.AF_INET
socketserver.TCPServer.__init__(self, *args, **kwargs)
if ipv6:
# pylint: disable=no-member
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
python类IPPROTO_IPV6的实例源码
def init_func(self, creator, address, crypto, crypto_configs, conn_timeout=800, is_ipv6=False):
self.__crypto_configs = crypto_configs
self.__crypto = crypto
self.__conn_timeout = conn_timeout
if is_ipv6:
fa = socket.AF_INET6
else:
fa = socket.AF_INET
s = socket.socket(fa, socket.SOCK_STREAM)
if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.set_socket(s)
self.bind(address)
self.listen(10)
self.register(self.fileno)
self.add_evt_read(self.fileno)
return self.fileno
def init_func(self, creator, address, crypto, crypto_configs, is_ipv6=False):
if is_ipv6:
fa = socket.AF_INET6
else:
fa = socket.AF_INET
s = socket.socket(fa, socket.SOCK_DGRAM)
if is_ipv6: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.set_socket(s)
self.bind(address)
self.register(self.fileno)
self.add_evt_read(self.fileno)
self.__encrypt = crypto.encrypt()
self.__decrypt = crypto.decrypt()
self.__encrypt.config(crypto_configs)
self.__decrypt.config(crypto_configs)
return self.fileno
def search(timeout):
'''
Search for devices implementing WANCommonInterfaceConfig on the network.
Search ends the specified number of seconds after the last result (if any) was received.
Returns an iterator of root device URLs.
'''
with contextlib.ExitStack() as stack:
sockets = []
sockets.append(stack.enter_context(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)))
sockets.append(stack.enter_context(socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)))
for s in sockets:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if s.family == socket.AF_INET6:
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
with concurrent.futures.ThreadPoolExecutor(len(sockets)) as ex:
return itertools.chain.from_iterable(ex.map(lambda s: search_socket(s, timeout, ns['i']), sockets))
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See https://bitbucket.org/cherrypy/cherrypy/issue/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See https://bitbucket.org/cherrypy/cherrypy/issue/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def sender(group):
addrinfo = socket.getaddrinfo(group, None)[0]
s = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
# Set Time-to-live (optional)
ttl_bin = struct.pack('@i', MYTTL)
if addrinfo[0] == socket.AF_INET: # IPv4
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin)
else:
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin)
while True:
data = repr(time.time())
s.sendto(data + '\0', (addrinfo[4][0], MYPORT))
time.sleep(1)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See http://www.cherrypy.org/ticket/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def _testOddCmsgSize(self):
self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
try:
nbytes = self.sendmsgToServer(
[MSG],
[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
except socket.error as e:
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer(
[MSG],
[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
array.array("i", [self.traffic_class])),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
self.assertEqual(nbytes, len(MSG))
# Tests for proper handling of truncated ancillary data
def testSingleCmsgTruncInData(self):
# Test truncation of a control message inside its associated
# data. The message may be returned with its data truncated,
# or not returned at all.
self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_RECVHOPLIMIT, 1)
self.misc_event.set()
msg, ancdata, flags, addr = self.doRecvmsg(
self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
self.assertEqual(msg, MSG)
self.checkRecvmsgAddress(addr, self.cli_addr)
self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
self.assertLessEqual(len(ancdata), 1)
if ancdata:
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
self.assertLess(len(cmsg_data), SIZEOF_INT)
def setup_ipv6_multicast_socket(ifaddrs, if_name, addr):
#todo: if_name ignored
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", Config.udp_multicast.port))
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, Config.udp_multicast.ttl)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 0)
mreq = struct.pack("16s16s", socket.inet_pton(socket.AF_INET6,addr), chr(0)*16)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
s.setblocking(0)
multicast_socket_ipv6.append((s,addr))
return True
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See
# https://github.com/cherrypy/cherrypy/issues/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See
# https://github.com/cherrypy/cherrypy/issues/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See
# https://github.com/cherrypy/cherrypy/issues/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def _testOddCmsgSize(self):
self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
try:
nbytes = self.sendmsgToServer(
[MSG],
[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
except OSError as e:
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer(
[MSG],
[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
array.array("i", [self.traffic_class])),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
self.assertEqual(nbytes, len(MSG))
# Tests for proper handling of truncated ancillary data
def testSingleCmsgTruncInData(self):
# Test truncation of a control message inside its associated
# data. The message may be returned with its data truncated,
# or not returned at all.
self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_RECVHOPLIMIT, 1)
self.misc_event.set()
msg, ancdata, flags, addr = self.doRecvmsg(
self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
self.assertEqual(msg, MSG)
self.checkRecvmsgAddress(addr, self.cli_addr)
self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
self.assertLessEqual(len(ancdata), 1)
if ancdata:
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
self.assertLess(len(cmsg_data), SIZEOF_INT)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See http://www.cherrypy.org/ticket/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See http://www.cherrypy.org/ticket/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See http://www.cherrypy.org/ticket/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See
# https://github.com/cherrypy/cherrypy/issues/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def _testOddCmsgSize(self):
self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
try:
nbytes = self.sendmsgToServer(
[MSG],
[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
except OSError as e:
self.assertIsInstance(e.errno, int)
nbytes = self.sendmsgToServer(
[MSG],
[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
array.array("i", [self.traffic_class])),
(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
array.array("i", [self.hop_limit]))])
self.assertEqual(nbytes, len(MSG))
# Tests for proper handling of truncated ancillary data
def testSingleCmsgTruncInData(self):
# Test truncation of a control message inside its associated
# data. The message may be returned with its data truncated,
# or not returned at all.
self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_RECVHOPLIMIT, 1)
self.misc_event.set()
msg, ancdata, flags, addr = self.doRecvmsg(
self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
self.assertEqual(msg, MSG)
self.checkRecvmsgAddress(addr, self.cli_addr)
self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
self.assertLessEqual(len(ancdata), 1)
if ancdata:
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
self.assertLess(len(cmsg_data), SIZEOF_INT)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
prevent_socket_inheritance(self.socket)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay and not isinstance(self.bind_addr, str):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.ssl_adapter is not None:
self.socket = self.ssl_adapter.bind(self.socket)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack. See
# https://github.com/cherrypy/cherrypy/issues/871.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6
and self.bind_addr[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.bind(self.bind_addr)
def join_ipv6_beacon_group(sock, ifindex):
"""Joins the MAAS IPv6 multicast group using the specified UDP socket.
:param sock: An opened IPv6 UDP socket.
:param ifindex: The interface index that should join the multicast group.
"""
# XXX mpontillo 2017-06-21: Twisted doesn't support IPv6 here yet.
# It would be nice to do this:
# transport.joinGroup(BEACON_IPV6_MULTICAST)
ipv6_join_sockopt_args = (
socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) +
struct.pack("I", ifindex)
)
try:
sock.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,
ipv6_join_sockopt_args)
except OSError:
# Do this on a best-effort basis. We might get an "Address already in
# use" error if the group is already joined, or (for whatever reason)
# it is not possible to join a multicast group using this interface.
pass
def discover_peers(self, port=None):
"""This method can be invoked (periodically?) to broadcast message to
discover peers, if there is a chance initial broadcast message may be
lost (as these messages are sent over UDP).
"""
ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__}
def _discover(addrinfo, port, task=None):
ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
ping_sock.settimeout(2)
if addrinfo.family == socket.AF_INET:
ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
else: # addrinfo.family == socket.AF_INET6
ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
struct.pack('@i', 1))
ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
ping_sock.bind((addrinfo.ip, 0))
if not port:
port = addrinfo.udp_sock.getsockname()[1]
ping_msg['location'] = addrinfo.location
try:
yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg),
(addrinfo.broadcast, port))
except:
pass
ping_sock.close()
for addrinfo in self._addrinfos:
SysTask(_discover, addrinfo, port)