def send(self, data, retransmit_delay=0.1):
if self.socket_ssl:
last_exception = None
for _ in xrange(3):
try:
self.socket_ssl.write(data)
last_exception = None
break
except ssl.SSLWantWriteError,swwe:
logger.warning("TCPSockBuff: ssl.sock not yet ready, retransmit (%d) in %f seconds: %s"%(_,retransmit_delay,repr(swwe)))
last_exception = swwe
time.sleep(retransmit_delay)
if last_exception:
raise last_exception
else:
self.socket.send(data)
self.sndbuf = data
python类SSLWantWriteError()的实例源码
def __write(self):
size = self.__writer.size()
data = self.__writer._getvalue()
try:
sent_size = self.__socket.send(data)
except BlockingIOError:
self.__write_ok = False
self.__writer.write(data)
except ssl.SSLWantWriteError:
return
except (ConnectionError, ssl.SSLEOFError):
raise HttpErr("the connection has been closed")
if size == sent_size:
self.__write_ok = True
return
bdata = data[sent_size:]
self.__writer.write(bdata)
self.__write_ok = False
def _do_tls_handshake(self):
logging.debug('Initializing TLS connection with {}:{}'.format(self.host, self.port))
self.s = ssl.wrap_socket(self.s, keyfile=os.path.join(shared.source_directory, 'tls', 'key.pem'),
certfile=os.path.join(shared.source_directory, 'tls', 'cert.pem'),
server_side=self.server, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False,
ciphers='AECDH-AES256-SHA', suppress_ragged_eofs=True)
if hasattr(self.s, "context"):
self.s.context.set_ecdh_curve("secp256k1")
while True:
try:
self.s.do_handshake()
break
except ssl.SSLWantReadError:
select.select([self.s], [], [])
except ssl.SSLWantWriteError:
select.select([], [self.s], [])
except Exception as e:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
self.status = 'disconnecting'
break
self.tls = True
logging.debug('Established TLS connection with {}:{}'.format(self.host, self.port))
def _read_ready(self):
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
if self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
self._loop.remove_reader(self._sock_fd)
self._loop.add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
if data:
self._protocol.data_received(data)
else:
try:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
logger.warning('returning true from eof_received() '
'has no effect when using ssl')
finally:
self.close()
def _write_ready(self):
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop.remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def test_on_handshake_writer_retry(self):
self.loop.set_debug(False)
self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
transport = self.ssl_transport()
self.loop.assert_writer(1, transport._on_handshake, None)
def test_read_ready_recv_write(self):
self.loop.remove_reader = mock.Mock()
self.loop.add_writer = mock.Mock()
self.sslsock.recv.side_effect = ssl.SSLWantWriteError
transport = self._make_one()
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._read_wants_write)
self.loop.remove_reader.assert_called_with(transport._sock_fd)
self.loop.add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)
def _retry(self, fn, *args):
finished = False
while not finished:
want_read = False
try:
ret = fn(*args)
except ssl.SSLWantReadError:
want_read = True
except ssl.SSLWantWriteError:
# can't happen, but if it did this would be the right way to
# handle it anyway
pass
else:
finished = True
# do any sending
data = self.outgoing.read()
if data:
self.sock.sendall(data)
# do any receiving
if want_read:
data = self.sock.recv(BUFSIZE)
if not data:
self.incoming.write_eof()
else:
self.incoming.write(data)
# then retry if necessary
return ret
def _handshake(self):
try:
self.socket.do_handshake(block=True)
except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
pass
def handle_read(self):
while True:
try:
asynchat.async_chat.handle_read(self)
except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
self._handshake()
else:
break
def connect(self, server):
"""Creates a (ssl) socket and connects to the server. Not using asyncore's connect-function because it sucks."""
# sockets are garbage collected, but if the connection isn't closed it might fail
try:
self.socket.shutdown(socket.SHUT_WR)
self.socket.close()
del self.socket
except Exception:
pass
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
if self.use_ssl:
try:
self.socket.setblocking(True)
self.socket = ssl.wrap_socket(self.socket)
except (ssl.SSLWantReadError, ssl.SSLWantWriteError) as e:
log.debug(e)
self._handshake()
except ssl.SSLError as e:
log.error(e)
self.exit()
return
finally:
self.socket.setblocking(False)
log.info('Connecting to %s', self.current_server)
self.socket.settimeout(30)
self.socket.connect(server)
self.handle_connect_event()
def _send_data(self):
if self.buffer_send and self:
try:
amount = self.s.send(self.buffer_send)
self.buffer_send = self.buffer_send[amount:]
except (BlockingIOError, ssl.SSLWantWriteError):
pass
except (BrokenPipeError, ConnectionResetError, ssl.SSLError) as e:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
self.status = 'disconnecting'
def _read_ready(self):
if self._conn_lost:
return
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
if self._buffer:
self._loop._add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
self._loop._remove_reader(self._sock_fd)
self._loop._add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
if data:
self._protocol.data_received(data)
else:
try:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
logger.warning('returning true from eof_received() '
'has no effect when using ssl')
finally:
self.close()
def _write_ready(self):
if self._conn_lost:
return
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop._remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def do_handshake(self):
try:
super(SSLSocket, self).do_handshake()
except ssl.SSLWantReadError:
return 1
except ssl.SSLWantWriteError:
return 2
return self._do_flux_handshake()
def _read_ready(self):
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
if self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
self._loop.remove_reader(self._sock_fd)
self._loop.add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
if data:
self._protocol.data_received(data)
else:
try:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
logger.warning('returning true from eof_received() '
'has no effect when using ssl')
finally:
self.close()
def _write_ready(self):
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop.remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def test_on_handshake_writer_retry(self):
self.loop.set_debug(False)
self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
transport = self.ssl_transport()
self.loop.assert_writer(1, transport._on_handshake, None)
def test_read_ready_recv_write(self):
self.loop.remove_reader = mock.Mock()
self.loop.add_writer = mock.Mock()
self.sslsock.recv.side_effect = ssl.SSLWantWriteError
transport = self._make_one()
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._read_wants_write)
self.loop.remove_reader.assert_called_with(transport._sock_fd)
self.loop.add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)
def test_write_ready_send_retry(self):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])
self.sslsock.send.side_effect = ssl.SSLWantWriteError
transport._write_ready()
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
self.sslsock.send.side_effect = BlockingIOError()
transport._write_ready()
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
def _try_handshake(self):
assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
try:
self._sock.do_handshake()
return True
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
pass
except (SSLZeroReturnError, ConnectionError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
return False
def _read_ready(self):
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
if self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
self._loop.remove_reader(self._sock_fd)
self._loop.add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
if data:
self._protocol.data_received(data)
else:
try:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
logger.warning('returning true from eof_received() '
'has no effect when using ssl')
finally:
self.close()
def _write_ready(self):
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop.remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def test_on_handshake_writer_retry(self):
self.loop.set_debug(False)
self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
self.loop.assert_writer(1, transport._on_handshake, None)
def test_read_ready_recv_write(self):
self.loop.remove_reader = mock.Mock()
self.loop.add_writer = mock.Mock()
self.sslsock.recv.side_effect = ssl.SSLWantWriteError
transport = self._make_one()
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._read_wants_write)
self.loop.remove_reader.assert_called_with(transport._sock_fd)
self.loop.add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)
def test_write_ready_send_retry(self):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])
self.sslsock.send.side_effect = ssl.SSLWantWriteError
transport._write_ready()
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
self.sslsock.send.side_effect = BlockingIOError()
transport._write_ready()
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
def _on_handshake(self, start_time):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
self._loop.add_reader(self._sock_fd,
self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
self._loop.add_writer(self._sock_fd,
self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
return
else:
raise
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
# Verify hostname if requested, Python 3.4+ uses check_hostname
# and checks the hostname in do_handshake()
if (self._server_hostname and
self._sslcontext.verify_mode != ssl.CERT_NONE):
try:
ssl.match_hostname(peercert, self._server_hostname)
except Exception as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed "
"on matching the hostname",
self, exc_info=True)
self._sock.close()
self._wakeup_waiter(exc)
return
# Add extra info that becomes available after handshake.
self._extra.update(peercert=peercert,
cipher=self._sock.cipher(),
compression=self._sock.compression(),
)
self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(self._wakeup_waiter)
if self._loop.get_debug():
dt = self._loop.time() - start_time
logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
def ssl_echo_serve_sync(sock, *, expect_fail=False):
try:
wrapped = SERVER_CTX.wrap_socket(
sock, server_side=True, suppress_ragged_eofs=False
)
wrapped.do_handshake()
while True:
data = wrapped.recv(4096)
if not data:
# other side has initiated a graceful shutdown; we try to
# respond in kind but it's legal for them to have already gone
# away.
exceptions = (BrokenPipeError,)
# Under unclear conditions, CPython sometimes raises
# SSLWantWriteError here. This is a bug (bpo-32219), but it's
# not our bug, so ignore it.
exceptions += (stdlib_ssl.SSLWantWriteError,)
if WORKAROUND_PYPY_BUG:
exceptions += (stdlib_ssl.SSLEOFError,)
try:
wrapped.unwrap()
except exceptions:
pass
return
wrapped.sendall(data)
except Exception as exc:
if expect_fail:
print("ssl_echo_serve_sync got error as expected:", exc)
else: # pragma: no cover
raise
else:
if expect_fail: # pragma: no cover
raise RuntimeError("failed to fail?")
# Fixture that gives a raw socket connected to a trio-test-1 echo server
# (running in a thread). Useful for testing making connections with different
# SSLContexts.
#
# This way of writing it is pretty janky, with the nursery hidden inside the
# fixture and no proper parental supervision. Don't copy this code; it was
# written this way before we knew better.