def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
python类SO_LINGER的实例源码
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
server = BaseServer(self.family, self.addr)
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=500))
t.start()
self.addCleanup(t.join)
s = socket.socket(self.family, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
server = BaseServer(self.family, self.addr)
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=500))
t.start()
def cleanup():
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
self.addCleanup(cleanup)
s = socket.socket(self.family, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except OSError:
pass
finally:
s.close()
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
def reset_connection(client):
# Close the connection with a TCP RST instead of a TCP FIN. client must
# be a smtplib.SMTP instance.
#
# https://stackoverflow.com/a/6440364/1570972
#
# socket(7) SO_LINGER option.
#
# struct linger {
# int l_onoff; /* linger active */
# int l_linger; /* how many seconds to linger for */
# };
#
# Is this correct for Windows/Cygwin and macOS?
struct_format = 'hh' if sys.platform == 'win32' else 'ii'
l_onoff = 1
l_linger = 0
client.sock.setsockopt(
socket.SOL_SOCKET,
socket.SO_LINGER,
struct.pack(struct_format, l_onoff, l_linger))
client.close()
# For integration with flufl.testing.
def _closeSocket(self, orderly):
# The call to shutdown() before close() isn't really necessary, because
# we set FD_CLOEXEC now, which will ensure this is the only process
# holding the FD, thus ensuring close() really will shutdown the TCP
# socket. However, do it anyways, just to be safe.
skt = self.socket
try:
if orderly:
if self._shouldShutdown:
skt.shutdown(2)
else:
# Set SO_LINGER to 1,0 which, by convention, causes a
# connection reset to be sent when close is called,
# instead of the standard FIN shutdown sequence.
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack("ii", 1, 0))
except socket.error:
pass
try:
skt.close()
except socket.error:
pass
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
server = BaseServer(self.family, self.addr)
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=500))
t.start()
def cleanup():
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
self.addCleanup(cleanup)
s = socket.socket(self.family, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except OSError:
pass
finally:
s.close()
def socket_init(*args, **kw):
_socket_init(*args, **kw)
args[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))
def socket_init(*args, **kw):
_socket_init(*args, **kw)
args[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))
def socket_init(*args, **kw):
_socket_init(*args, **kw)
args[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))
def test_quick_connect(self):
# Clients that connected and disconnected quickly could cause
# the server to crash, due to a failure to catch errors in the
# initial part of the connection process.
# Tracked in issues #91, #104 and #105.
# See also https://bugs.launchpad.net/zodb/+bug/135108
import struct
def connect(addr):
with contextlib.closing(socket.socket()) as s:
# Set SO_LINGER to 1,0 causes a connection reset (RST) to
# be sent when close() is called, instead of the standard
# FIN shutdown sequence.
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
s.settimeout(TIMEOUT)
try:
s.connect(addr)
except socket.error:
pass
for x in range(10):
connect((self.server.host, self.server.port))
for x in range(10):
addr = self.client.makepasv()
connect(addr)
def __connect (self):
'''
Connect to the PED-RPC server.
'''
# if we have a pre-existing server socket, ensure it's closed.
self.__disconnect()
# connect to the server, timeout on failure.
try:
self.__server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__server_sock.settimeout(3.0)
self.__server_sock.connect((self.__host, self.__port))
except:
if self.__retry != 5:
self.__retry += 1
time.sleep(5)
self.__connect()
else:
sys.stderr.write("PED-RPC> unable to connect to server %s:%d\n" % (self.__host, self.__port))
raise Exception
# disable timeouts and lingering.
self.__server_sock.settimeout(None)
self.__server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, self.NOLINGER)
####################################################################################################################
def _init_sock(self):
_sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
_sock = self.ssl_context.wrap_socket(_sock,
server_hostname=self.host)
# socket options
linger = struct.pack('ii', 0, 0)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sock = _sock
def _init_sock(self):
if self.unix_socket:
_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
_sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# socket options
linger = struct.pack('ii', 0, 0)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock = _sock
def _create_connection(self, ipaddr, forward, queobj):
ip = ipaddr[0]
try:
# create a ipv4/ipv6 socket object
sock = socket.socket(socket.AF_INET if ':' not in ip else socket.AF_INET6)
# set reuseaddr option to avoid 10048 socket error
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# set struct linger{l_onoff=1,l_linger=0} to avoid 10048 socket error
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, self.offlinger_val)
# resize socket recv buffer 8K->1M to improve browser releated application performance
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768)
# disable nagle algorithm to send http request quickly.
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
# set a short timeout to trigger timeout retry more quickly.
sock.settimeout(forward if forward else 1)
set_connect_start(ip)
# start connection time record
start_time = time()
# TCP connect
sock.connect(ipaddr)
# record TCP connection time
self.tcp_connection_time[ipaddr] = sock.tcp_time = time() - start_time
# put socket object to output queobj
sock.xip = ipaddr
queobj.put(sock)
except NetWorkIOError as e:
# any socket.error, put Excpetions to output queobj.
e.xip = ipaddr
queobj.put(e)
# reset a large and random timeout to the ipaddr
self.tcp_connection_time[ipaddr] = self.timeout + 1
# close tcp socket
sock.close()
finally:
set_connect_finish(ip)
def getipinfo(self, ip, conntimeout=g_conntimeout, handshaketimeout=g_handshaketimeout, timeout=g_timeout, retry=None):
if ipnotuse(ip):
return None, 0, False
start_time = time()
costtime = 0
domain = None
sock = None
ssl_sock = None
try:
sock = socket.socket(socket.AF_INET if ':' not in ip else socket.AF_INET6)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, http_gws.offlinger_val)
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
ssl_sock = http_gws.get_ssl_socket(sock, g_servername)
ssl_sock.settimeout(conntimeout)
ssl_sock.connect((ip, 443))
ssl_sock.settimeout(handshaketimeout)
ssl_sock.do_handshake()
ssl_sock.settimeout(timeout)
handshaked_time = time() - start_time
if handshaked_time > handshaketimeout:
raise socket.error('handshake cost %dms timed out' % int(handshaked_time*1000))
cert = http_gws.google_verify(ssl_sock)
domain = cert.get_subject().CN
if not domain:
raise ssl.SSLError('%s ???? commonName?%s ' % (ip, cert))
except NetWorkIOError as e:
sock.close()
ssl_sock = None
if not retry and e.args == (-1, 'Unexpected EOF'):
return self.getipinfo(ip, conntimeout, handshaketimeout, timeout, True)
WARNING('%r', e)
code = self.getstatuscode(ssl_sock, sock, ip) if ssl_sock else ''
costtime = int((time()-start_time)*1000)
return domain, costtime, code == b'302'
def checkPortAvailable(ha):
"""Checks whether the given port is available"""
# Not sure why OS would allow binding to one type and not other.
# Checking for port available for TCP and UDP, this is done since
# either RAET (UDP) or CurveZMQ(TCP) could have been used
sockTypes = (socket.SOCK_DGRAM, socket.SOCK_STREAM)
for typ in sockTypes:
sock = socket.socket(socket.AF_INET, typ)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(ha)
if typ == socket.SOCK_STREAM:
l_onoff = 1
l_linger = 0
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', l_onoff, l_linger))
except OSError as exc:
if exc.errno in [
errno.EADDRINUSE, errno.EADDRNOTAVAIL,
WS_SOCKET_BIND_ERROR_ALREADY_IN_USE,
WS_SOCKET_BIND_ERROR_NOT_AVAILABLE
]:
raise PortNotAvailable(ha)
else:
raise exc
finally:
sock.close()
def _socket_create_and_loop_connect(self):
self._socket_lock.acquire()
is_socket_lock = True
if self._is_loop_connecting:
return RET_ERROR, "is loop connecting, can't create_session"
self._is_loop_connecting = True
if self.s is not None:
self._force_close_session()
while True:
try:
if not is_socket_lock:
is_socket_lock = True
self._socket_lock.acquire()
s = sock.socket()
s.setsockopt(sock.SOL_SOCKET, sock.SO_REUSEADDR, 0)
s.setsockopt(sock.SOL_SOCKET, sock.SO_LINGER, pack("ii", 0, 0))
s.settimeout(10)
self.s = s
self.s.connect((self.__host, self.__port))
except Exception as e:
traceback.print_exc()
err = sys.exc_info()[1]
err_msg = ERROR_STR_PREFIX + str(err)
print("socket connect err:{}".format(err_msg))
self.s = None
if s:
s.close()
del s
sleep(1.5)
continue
if self._connected_handler is not None:
is_socket_lock = False
self._socket_lock.release()
sock_ok, is_retry = self._connected_handler.notify_sync_socket_connected(self)
if not sock_ok:
self._force_close_session()
if is_retry:
print("wait to connect futunn plugin server")
sleep(1.5)
continue
else:
return RET_ERROR, "obj is closed"
else:
break
self._is_loop_connecting = False
if is_socket_lock:
# is_socket_lock = False
self._socket_lock.release()
return RET_OK, ''
def create_gws_connection_withproxy(self, address, hostname, cache_key, getfast=None, **kwargs):
proxy = self.get_gws_front(getfast)
proxytype, proxyuser, proxypass, proxyaddress = parse_proxy(proxy)
proxyhost, _, proxyport = proxyaddress.rpartition(':')
ips = dns_resolve(proxyhost)
if ips:
ipcnt = len(ips)
else:
logging.error('create_gws_connection_withproxy ?????????%r', proxy)
return
if ipcnt > 1:
#??????? IP??????????
ips.sort(key=self.get_gws_front_connection_time_ip)
proxyport = int(proxyport)
ohost, port = address
while ips:
proxyhost = ips.pop(0)
host = random.choice(dns[hostname])
if proxytype:
proxytype = proxytype.upper()
if proxytype not in socks.PROXY_TYPES:
proxytype = 'HTTP'
proxy_sock = socks.socksocket(socket.AF_INET if ':' not in proxyhost else socket.AF_INET6)
# set reuseaddr option to avoid 10048 socket error
proxy_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# set struct linger{l_onoff=1,l_linger=0} to avoid 10048 socket error
proxy_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, self.offlinger_val)
# resize socket recv buffer 8K->1M to improve browser releated application performance
proxy_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
proxy_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768)
# disable nagle algorithm to send http request quickly.
proxy_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
proxy_sock.set_proxy(socks.PROXY_TYPES[proxytype], proxyhost, proxyport, True, proxyuser, proxypass)
start_time = time()
try:
proxy_ssl_sock = self.get_ssl_socket(proxy_sock, ohost.encode())
proxy_ssl_sock.settimeout(self.timeout)
#proxy_ssl_sock.set_connect_state()
proxy_ssl_sock.connect((host, port))
proxy_ssl_sock.do_handshake()
except Exception as e:
cost_time = self.timeout + 1 + random.random()
if ipcnt > 1:
self.gws_front_connection_time['ip'][proxyhost] = cost_time
self.gws_front_connection_time[proxy] = cost_time
logging.error('create_gws_connection_withproxy ???? [%s] ???%r', proxy, e)
continue
else:
cost_time = time() - start_time
if ipcnt > 1:
self.gws_front_connection_time['ip'][proxyhost] = cost_time
self.gws_front_connection_time[proxy] = cost_time
proxy_ssl_sock.sock = proxy_sock
proxy_ssl_sock.xip = proxyhost, proxyport
return proxy_ssl_sock