def check_connection_ntuple(conn):
"""Check validity of a connection namedtuple."""
valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if
x.startswith('CONN_')]
assert conn[0] == conn.fd
assert conn[1] == conn.family
assert conn[2] == conn.type
assert conn[3] == conn.laddr
assert conn[4] == conn.raddr
assert conn[5] == conn.status
assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type)
assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family)
assert conn.status in valid_conn_states, conn.status
# check IP address and port sanity
for addr in (conn.laddr, conn.raddr):
if not addr:
continue
if conn.family in (AF_INET, AF_INET6):
assert isinstance(addr, tuple), addr
ip, port = addr
assert isinstance(port, int), port
assert 0 <= port <= 65535, port
check_net_address(ip, conn.family)
elif conn.family == AF_UNIX:
assert isinstance(addr, (str, None)), addr
else:
raise ValueError("unknown family %r", conn.family)
if conn.family in (AF_INET, AF_INET6):
# actually try to bind the local socket; ignore IPv6
# sockets as their address might be represented as
# an IPv4-mapped-address (e.g. "::127.0.0.1")
# and that's rejected by bind()
if conn.family == AF_INET:
s = socket.socket(conn.family, conn.type)
with contextlib.closing(s):
try:
s.bind((conn.laddr[0], 0))
except socket.error as err:
if err.errno != errno.EADDRNOTAVAIL:
raise
elif conn.family == AF_UNIX:
assert not conn.raddr, repr(conn.raddr)
assert conn.status == psutil.CONN_NONE, conn.status
if getattr(conn, 'fd', -1) != -1:
assert conn.fd > 0, conn
if hasattr(socket, 'fromfd') and not WINDOWS:
try:
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
except (socket.error, OSError) as err:
if err.args[0] != errno.EBADF:
raise
else:
with contextlib.closing(dupsock):
assert dupsock.family == conn.family
assert dupsock.type == conn.type
评论列表
文章目录