def _write_ready(self):
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop.remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
if n:
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
python类SSLWantReadError()的实例源码
def test_on_handshake_reader_retry(self):
self.loop.set_debug(False)
self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
self.loop.assert_reader(1, transport._on_handshake, None)
def test_read_ready_recv_retry(self):
self.sslsock.recv.side_effect = ssl.SSLWantReadError
transport = self._make_one()
transport._read_ready()
self.assertTrue(self.sslsock.recv.called)
self.assertFalse(self.protocol.data_received.called)
self.sslsock.recv.side_effect = BlockingIOError
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
self.sslsock.recv.side_effect = InterruptedError
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
def test_write_ready_send_read(self):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])
self.loop.remove_writer = mock.Mock()
self.sslsock.send.side_effect = ssl.SSLWantReadError
transport._write_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._write_wants_read)
self.loop.remove_writer.assert_called_with(transport._sock_fd)
def _on_handshake(self, start_time):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
self._loop.add_reader(self._sock_fd,
self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
self._loop.add_writer(self._sock_fd,
self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
return
else:
raise
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
# Verify hostname if requested, Python 3.4+ uses check_hostname
# and checks the hostname in do_handshake()
if (self._server_hostname and
self._sslcontext.verify_mode != ssl.CERT_NONE):
try:
ssl.match_hostname(peercert, self._server_hostname)
except Exception as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed "
"on matching the hostname",
self, exc_info=True)
self._sock.close()
self._wakeup_waiter(exc)
return
# Add extra info that becomes available after handshake.
self._extra.update(peercert=peercert,
cipher=self._sock.cipher(),
compression=self._sock.compression(),
)
self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(self._wakeup_waiter)
if self._loop.get_debug():
dt = self._loop.time() - start_time
logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
def first_test():
sock = socket.create_connection(('google.com', 443))
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.create_default_context()
ssl_obj = ctx.wrap_bio(incoming, outgoing)
try:
ssl_obj.write(b'')
except ssl.SSLWantReadError as err:
print("err", err)
print('ssl_obj', ssl_obj.pending())
print('incoming', incoming.pending)
print('outgoing', outgoing.pending)
# print()
# print(outgoing.read())
data = outgoing.read()
# print("data", data)
sock.send(data)
print('ssl_obj', ssl_obj.pending())
print('incoming', incoming.pending)
print('outgoing', outgoing.pending)
got = sock.recv(10240)
print('sock.recv got', len(got))
incoming.write(got)
# print(incoming.read())
# print(ssl_obj.read())
# print('sock.recv', got)
try:
# ssl_obj.write(got)
ssl_obj.do_handshake()
except ssl.SSLWantReadError as err:
print("err", err)
print('ssl_obj', ssl_obj.pending())
print('incoming', incoming.pending)
print('outgoing', outgoing.pending)
def read(self):
'''Read a line of data from the server, if any.'''
# Only do something if we're connected.
if self.__connected:
done = False
received = ""
while not done:
try:
if self.ssl:
data = self.__ssl.recv(1)
else:
data = self.__socket.recv(1)
except (ssl.SSLWantReadError, BlockingIOError):
received = None
break
except OSError as err:
debug.error("Error #" + str(err.errno) + ": '" + err.strerror + "' disconnecting.")
data = False
# Process the data.
# socket.recv is supposed to return a False if the connection
# been broken.
if not data:
self.disconnect()
done = True
received = None
else:
text = data.decode('utf-8','replace')
if text == '\n':
done = True
else:
received += text
else:
received = None
# Remove the trailing carriage return character (cr/lf pair)
if not received is None:
received = received.strip('\r')
if len(received) > 0:
if received[0] == ':':
received = received[1:]
# Bug fix for Issue #18, do not return blank lines.
if received == "":
received = None
return received
def run(self):
if self.s is None:
self._connect()
if self.status != 'connected':
return
self.s.settimeout(0)
if not self.server:
self.send_queue.put(message.Version(self.host, self.port))
while True:
if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive):
self._on_connection_fully_established()
data = True
try:
if self.status == 'fully_established':
data = self.s.recv(4096)
self.buffer_receive += data
if data and len(self.buffer_receive) < 4000000:
continue
else:
data = self.s.recv(self.next_message_size - len(self.buffer_receive))
self.buffer_receive += data
except ssl.SSLWantReadError:
if self.status == 'fully_established':
self._request_objects()
self._send_objects()
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
if self.status == 'fully_established':
self._request_objects()
self._send_objects()
else:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
data = None
except ConnectionResetError:
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host, self.port))
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
if time.time() - self.last_message_received > shared.timeout:
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format(
self.host, self.port))
self.status = 'disconnecting'
if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting':
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format(
self.host, self.port))
self.status = 'disconnecting'
if time.time() - self.last_message_sent > 300 and self.status == 'fully_established':
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.info('Disconnected from {}:{}'.format(self.host, self.port))
break
time.sleep(0.2)