def getTcpKeepAlive(self):
return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE))
python类SO_KEEPALIVE的实例源码
def setTcpKeepAlive(self, enabled):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
def test_set_tcp_keepalive(self):
mock_sock = mock.Mock()
netutils.set_tcp_keepalive(mock_sock, True, 100, 10, 5)
calls = [
mock.call.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, True),
]
if hasattr(socket, 'TCP_KEEPIDLE'):
calls += [
mock.call.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE, 100)
]
if hasattr(socket, 'TCP_KEEPINTVL'):
calls += [
mock.call.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPINTVL, 10),
]
if hasattr(socket, 'TCP_KEEPCNT'):
calls += [
mock.call.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPCNT, 5)
]
mock_sock.assert_has_calls(calls)
mock_sock.reset_mock()
netutils.set_tcp_keepalive(mock_sock, False)
self.assertEqual(1, len(mock_sock.mock_calls))
def connect(self, host, port):
family, socktype, proto, _, sockaddr = socket.getaddrinfo(host, port, self.family, self.sock_type, self.protocol)[0]
s = socket.socket(family, socktype, proto)
s.settimeout(self.timeout)
s.connect(sockaddr)
if self.nodelay:
s.setsockopt(self.protocol, socket.TCP_NODELAY, 1)
if self.keepalive:
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock = s
return s
def connect(self) -> None:
if self.host.startswith('/'):
self.reader, self.writer = await asyncio.open_unix_connection(
path=self.host, loop=self.loop
)
else:
self.reader, self.writer = await asyncio.open_connection(
host=self.host, port=self.port, loop=self.loop
)
sock = self.writer.transport.get_extra_info('socket')
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, int(self.options.pool_options.socket_keepalive))
if self.host.startswith('/'):
endpoint = self.host
else:
endpoint = '{}:{}'.format(self.host, self.port)
logger.debug('Established connection to {}'.format(endpoint))
self.read_loop_task = asyncio.ensure_future(self.read_loop(), loop=self.loop)
ismaster = IsMaster(await self.command(
'admin', SON([('ismaster', 1)]), ReadPreference.PRIMARY, DEFAULT_CODEC_OPTIONS
))
self.is_mongos = ismaster.server_type == SERVER_TYPE.Mongos
self.max_wire_version = ismaster.max_wire_version
if ismaster.max_bson_size:
self.max_bson_size = ismaster.max_bson_size
if ismaster.max_message_size:
self.max_message_size = ismaster.max_message_size
if ismaster.max_write_batch_size:
self.max_write_batch_size = ismaster.max_write_batch_size
self.is_writable = ismaster.is_writable
self.slave_ok = not self.is_mongos and self.options.read_preference != ReadPreference.PRIMARY
if self.options.credentials:
await self._authenticate()
# Notify waiters that connection has been established
self.__connected.set()
def set_sock_keepalive(sock):
if Pyro.config.PYRO_SOCK_KEEPALIVE:
try:
sock.setsockopt ( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
except:
Pyro.config.PYRO_SOCK_KEEPALIVE=0 # it didn't work--disable keepalives.
#------ PYRO: adapter (default Pyro wire protocol)
#------ This adapter is for protocol version 4 ONLY
# Future adapters could be downwards compatible and more flexible.
def connect(self, connect_params=None):
""" Connect to the remote server with the given connect parameters.
:param connect_params: A list or dict containing application specific connect parameters
:type connect_params: list | dict
"""
if self.proxy:
parts = self.proxy.split(':')
ip = parts[0]
port = int(parts[1])
ps = socks.socksocket()
ps.set_proxy(socks.HTTP, addr=ip, port=port)
self.socket = ps
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.ip, self.port))
self.file = self.socket.makefile()
self.stream = FileDataTypeMixIn(self.file)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if self.is_win:
self.socket.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 10000, 3000))
self.handshake()
self.reader = reader.RtmpReader(self.stream)
self.writer = writer.RtmpWriter(self.stream)
self._connect_rtmp(connect_params)
def __init__(self, url=None, log_level=logging.INFO, **kwargs):
url = url or os.environ.get('STEEMD_HTTP_URL')
self.url = url
self.hostname = urlparse(url).hostname
self.return_with_args = kwargs.get('return_with_args', False)
self.re_raise = kwargs.get('re_raise', False)
self.max_workers = kwargs.get('max_workers', None)
num_pools = kwargs.get('num_pools', 10)
maxsize = kwargs.get('maxsize', 10)
timeout = kwargs.get('timeout', 60)
retries = kwargs.get('retries', 30)
pool_block = kwargs.get('pool_block', False)
tcp_keepalive = kwargs.get('tcp_keepalive', True)
if tcp_keepalive:
socket_options = HTTPConnection.default_socket_options + \
[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), ]
else:
socket_options = HTTPConnection.default_socket_options
self.http = urllib3.poolmanager.PoolManager(
num_pools=num_pools,
maxsize=maxsize,
block=pool_block,
timeout=timeout,
retries=retries,
socket_options=socket_options,
headers={'Content-Type': 'application/json'},
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where())
'''
urlopen(method, url, body=None, headers=None, retries=None,
redirect=True, assert_same_host=True, timeout=<object object>,
pool_timeout=None, release_conn=None, chunked=False, body_pos=None,
**response_kw)
'''
self.request = partial(self.http.urlopen, 'POST', url)
_logger = sbds.sbds_logging.getLogger('urllib3')
sbds.sbds_logging.configure_existing_logger(_logger, level=log_level)
def tcp_keepalive(server, transport):
sock = transport.get_extra_info('socket')
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
def set_socket(s, keep_alive=False):
# TCP socket keep alive
if keep_alive and sys.platform == "linux":
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 30)
s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 5)
s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 3)
def try_to_connect(self):
""" Connect to MPD socket """
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.socket.connect((self.host, self.port))
self.reader = self.socket.makefile(self.reader_flags, encoding=self.character_encoding)
self.writer = self.socket.makefile(self.writer_flags, encoding=self.character_encoding)
self.read_line()
except:
if self.socket:
self.disconnect()
return False
return True
def set_keepalive(self,_sock, after_idle_sec=5, interval_sec=3, max_fails=60):
"""Set TCP keepalive on an open socket.--new add
It activates after 1 second (after_idle_sec) of idleness,
then sends a keepalive ping once every 3 seconds (interval_sec),
and closes the connection after 60 failed ping (max_fails), or 180 seconds
"""
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)
def connect(self, address, port, head = -1, block = False, timeout = 0):
self.close()
self.block = block
af = socket.AF_INET
if ':' in address:
if not 'AF_INET6' in socket.__dict__:
return -1
if not socket.has_ipv6:
return -2
af = socket.AF_INET6
self.ipv6 = True
self.sock = socket.socket(af, socket.SOCK_STREAM)
to = self.sock.gettimeout()
if not self.block:
self.sock.setblocking(0)
elif timeout > 0:
self.sock.settimeout(timeout)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.state = NET_STATE_CONNECTING
try:
hr = self.sock.connect_ex((address, port))
except socket.error, e:
if self.block:
self.close()
return -3
if self.block and hr != 0:
return -4
if self.block and timeout > 0:
self.sock.settimeout(to)
self.send_buf = ''
self.recv_buf = ''
self.errc = 0
if head >= 0 and head <= 14:
self.__head_init(head)
if self.block:
self.state = NET_STATE_ESTABLISHED
return 0
# close connection
def assign(self, sock, head = -1, block = False):
self.close()
self.block = block
self.sock = sock
self.sock.setblocking(self.block and 1 or 0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.state = NET_STATE_ESTABLISHED
if head >= 0 and head <= 14:
self.__head_init(head)
self.send_buf = ''
self.recv_buf = ''
return 0
# update
def keepalive (self, keepalive = 0):
if not 'SO_KEEPALIVE' in socket.__dict__:
return -1
if self.sock == None:
return -2
try:
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, keepalive)
except:
return -3
return 0
# set rc4 key for sending encryption
def getTcpKeepAlive(self):
return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE))
def setTcpKeepAlive(self, enabled):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
def getTcpKeepAlive(self):
return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE))
def setTcpKeepAlive(self, enabled):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
def _set_keep_alive(self):
stream_socket = self._stream.socket
stream_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if platform.system().lower() == 'linux':
stream_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 30)
stream_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 5)
stream_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)