def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
评论列表
文章目录