def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
python类AF_INET的实例源码
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
def _parse_address(address):
if isinstance(address, tuple):
if not address[0] or ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
if ((isinstance(address, string_types) and ':' not in address)
or isinstance(address, integer_types)): # noqa (pep8 E129)
# Just a port
return _socket.AF_INET6, ('', int(address))
if not isinstance(address, string_types):
raise TypeError('Expected tuple or string, got %s' % type(address))
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# backlog argument for compat with tcp_listener
# pylint:disable=unused-argument
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _parse_address(address):
if isinstance(address, tuple):
if not address[0] or ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
if ((isinstance(address, string_types) and ':' not in address)
or isinstance(address, integer_types)): # noqa (pep8 E129)
# Just a port
return _socket.AF_INET6, ('', int(address))
if not isinstance(address, string_types):
raise TypeError('Expected tuple or string, got %s' % type(address))
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# backlog argument for compat with tcp_listener
# pylint:disable=unused-argument
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _parse_address(address):
if isinstance(address, tuple):
if ':' in address[0]:
return _socket.AF_INET6, address
return _socket.AF_INET, address
elif isinstance(address, string_types):
if ':' in address:
host, port = address.rsplit(':', 1)
family, host = _extract_family(host)
if host == '*':
host = ''
return family, (host, int(port))
else:
return _socket.AF_INET, ('', int(address))
elif isinstance(address, integer_types):
return _socket.AF_INET, ('', int(address))
else:
raise TypeError('Expected tuple or string, got %s' % type(address))
def _broadcast_ip():
cs = socket(AF_INET, SOCK_DGRAM)
cs.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
cs.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
try:
while _broadcast:
try:
cs.sendto("8443".encode("utf-8"), ('255.255.255.255', 42945))
except OSError as network_error:
logger.warning("Error while broadcasting: %s", str(network_error))
time.sleep(3)
finally:
cs.close()
def get_local_ip_address():
s = socket(AF_INET, SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock