def bcast_socket(socket_module=socket):
"""
Make a socket and configure it for UDP broadcast.
Parameters
----------
socket_module: module, optional
Default is the built-in :mod:`socket` module, but anything with the
same interface may be used, such as :mod:`curio.socket`.
"""
socket = socket_module
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# for BSD/Darwin only
try:
socket.SO_REUSEPORT
except AttributeError:
...
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
return sock
python类SO_REUSEPORT的实例源码
def set_options(self, sock, bound=False):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'): # pragma: no cover
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except socket.error as err:
if err[0] not in (errno.ENOPROTOOPT, errno.EINVAL):
raise
if not bound:
self.bind(sock)
sock.setblocking(0)
# make sure that the socket can be inherited
if hasattr(sock, "set_inheritable"):
sock.set_inheritable(True)
sock.listen(self.conf.backlog)
return sock
def main(self):
if self.protocol == 'IPv4':
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server_sock.bind(('' if isinstance(self.ip_dst, type(None)) else self.ip_dst, self.port_dst))
server_sock.listen(self.backlog_size)
server_sock.settimeout(self.timeout)
self._serve(server_sock)
server_sock.close()
def create_sockets(mcst_ipaddr, mcst_port):
mreq = struct.pack("4sl", socket.inet_aton(mcst_ipaddr),
socket.INADDR_ANY)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
if platform.system() == "Windows":
s.bind(("", mcst_port))
return (s, )
else:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
s.bind((mcst_ipaddr, mcst_port))
bsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
bsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
bsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
bsock.bind(("", 1901))
return (s, bsock)
def test_create_server_reuse_port(self):
proto = MyProto(self.loop)
f = self.loop.create_server(
lambda: proto, '0.0.0.0', 0)
server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1)
sock = server.sockets[0]
self.assertFalse(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
server.close()
test_utils.run_briefly(self.loop)
proto = MyProto(self.loop)
f = self.loop.create_server(
lambda: proto, '0.0.0.0', 0, reuse_port=True)
server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1)
sock = server.sockets[0]
self.assertTrue(
sock.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT))
server.close()
def test_create_datagram_endpoint_ip_addr(self, m_socket):
def getaddrinfo(*args, **kw):
self.fail('should not have called getaddrinfo')
m_socket.getaddrinfo = getaddrinfo
m_socket.socket.return_value.bind = bind = mock.Mock()
self.loop.add_reader = mock.Mock()
self.loop.add_reader._is_coroutine = False
reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
coro = self.loop.create_datagram_endpoint(
lambda: MyDatagramProto(loop=self.loop),
local_addr=('1.2.3.4', 0),
reuse_address=False,
reuse_port=reuseport_supported)
t, p = self.loop.run_until_complete(coro)
try:
bind.assert_called_with(('1.2.3.4', 0))
m_socket.socket.assert_called_with(family=m_socket.AF_INET,
proto=m_socket.IPPROTO_UDP,
type=m_socket.SOCK_DGRAM)
finally:
t.close()
test_utils.run_briefly(self.loop) # allow transport to close
def createInternetSocket(self):
s = tcp.Port.createInternetSocket(self)
if self._reuse:
#
# reuse IP Port
#
if 'bsd' in sys.platform or \
sys.platform.startswith('linux') or \
sys.platform.startswith('darwin'):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
elif sys.platform == 'win32':
# on Windows, REUSEADDR already implies REUSEPORT
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
raise Exception("don't know how to set SO_REUSEPORT on platform {}".format(sys.platform))
return s
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def __init__(self, handlers, addrinfo):
self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
asyncore.dispatcher.__init__(self)
self.handlers = handlers
try:
af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
self.create_socket(af, socktype)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
if have_ipv6 and af == socket.AF_INET6:
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.bind(sockaddr)
self.listen(5)
except Exception:
self.logger.exception("Couldn't set up HTTP listener")
self.close()
for h in handlers:
self.logger.debug("Handling %s", h[0])
def runAttack(self, targetList):
# targetList = [{'ipaddress:':'127.0.0.1', 'port':53, 'payload':'blablabla']
for counter in range(0, self.replayCount):
for port in self.bindPorts:
requestPool = Pool(processes=self.maxProcesses)
sharedSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sharedSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sharedSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
#sharedSocket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
try:
sharedSocket.bind(('', port))
print("Sending packets from port %s" % port)
taskList = [(sharedSocket, targetParams.get('ipaddress'), targetParams.get('port'), targetParams.get('payload')) for targetParams in targetList]
results = requestPool.starmap(asyncSendPayload, taskList)
except Exception as e:
print("Failed binding port %s: %s" % (port, e))
print("Closing process pool")
requestPool.close()
print("Joining process pool")
requestPool.join()
print("Closing shared socket")
sharedSocket.close()
return True
def _server(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
# set the timeout to prevent the server loop from
# blocking indefinitaly on sock.accept()
sock.settimeout(0.5)
sock.bind((self._host, self._port))
sock.listen(1)
with sock:
while not self._closing.is_set():
try:
client, addr = sock.accept()
with client:
sout = client.makefile('w', encoding='utf-8')
sin = client.makefile('r', encoding='utf-8')
self._interactive_loop(sout, sin)
except (socket.timeout, OSError):
continue
def _init_sock(self):
if self.unix_socket:
# try remove the sock file it already exists
_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
_sock.connect(self.unix_socket)
except (socket.error, OSError) as err:
if err.args[0] == errno.ECONNREFUSED:
os.unlink(self.unix_socket)
else:
_sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
try:
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except socket.error as err:
if err[0] in (errno.ENOPROTOOPT, errno.EINVAL):
pass
else:
raise
_sock.settimeout(None)
self.sock = _sock
def make_WebApplicationService(postgresListener, statusWorker):
from maasserver.webapp import WebApplicationService
site_port = DEFAULT_PORT # config["port"]
# Make a socket with SO_REUSEPORT set so that we can run multiple web
# applications. This is easier to do from outside of Twisted as there's
# not yet official support for setting socket options.
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
# N.B, using the IPv6 INADDR_ANY means that getpeername() returns something
# like: ('::ffff:192.168.133.32', 40588, 0, 0)
s.bind(('::', site_port))
# Use a backlog of 50, which seems to be fairly common.
s.listen(50)
# Adopt this socket into Twisted's reactor.
site_endpoint = AdoptedStreamServerEndpoint(reactor, s.fileno(), s.family)
site_endpoint.port = site_port # Make it easy to get the port number.
site_endpoint.socket = s # Prevent garbage collection.
site_service = WebApplicationService(
site_endpoint, postgresListener, statusWorker)
return site_service
def createInternetSocket(self):
skt = Port.createInternetSocket(self)
if self.listenMultiple:
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
return skt
def createInternetSocket(self):
skt = Port.createInternetSocket(self)
if self.listenMultiple:
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
return skt
def setup(self):
global sock
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind((ARG_IP, ARG_PORT))
sock.listen(10)
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR "
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
if reuse == 1:
raise TestFailed("tests should never set the SO_REUSEPORT "
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def __init__(self, local_port):
socket.socket.__init__(self, socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.bind(('', local_port))
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def get_mdns_sock(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if sys.platform == 'darwin':
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(('', 5353))
mreq = struct.pack("4sl", socket.inet_aton('224.0.0.251'), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
return sock
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except EnvironmentError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except EnvironmentError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except socket.error:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def createInternetSocket(self):
skt = Port.createInternetSocket(self)
if self.listenMultiple:
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
return skt
def createInternetSocket(self):
skt = Port.createInternetSocket(self)
if self.listenMultiple:
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
return skt