def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
python类AF_INET的实例源码
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error:
ex = sys.exc_info()[1]
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock
def _extract_family(host):
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
return _socket.AF_INET6, host
return _socket.AF_INET, host
def _tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
"""A shortcut to create a TCP socket, bind it and put it into listening state."""
sock = socket(family=family)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
sock.listen(backlog)
sock.setblocking(0)
return sock
def _udp_socket(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
# we want gevent.socket.socket here
sock = socket(family=family, type=_socket.SOCK_DGRAM)
if reuse_addr is not None:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, reuse_addr)
try:
sock.bind(address)
except _socket.error as ex:
strerror = getattr(ex, 'strerror', None)
if strerror is not None:
ex.strerror = strerror + ': ' + repr(address)
raise
return sock