def handle_read(self):
try:
while True:
buf = self.recv(self.in_buffer_size)
self._iobuf.write(buf)
if len(buf) < self.in_buffer_size:
break
except socket.error as err:
if ssl and isinstance(err, ssl.SSLError):
if err.args[0] not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
self.defunct(err)
return
elif err.args[0] not in NONBLOCKING:
self.defunct(err)
return
if self._iobuf.tell():
self.process_io_buffer()
if not self._requests and not self.is_control_connection:
self._readable = False
python类SSL_ERROR_WANT_WRITE的实例源码
def send (self, data):
if self._closed:
# usually handshaking failure, already handled exception
return
try:
numsent = self.socket.send (data)
if numsent:
self.set_event_time ()
return numsent
except ssl.SSLError as why:
if why.errno == ssl.SSL_ERROR_WANT_WRITE:
return 0
elif why.errno == ssl.SSL_ERROR_ZERO_RETURN:
self.handle_close (700, "Connection closed by SSL_ERROR_ZERO_RETURN")
return 0
else:
raise
def handle_connect_event(self):
if not self.handshaking:
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise socket.error(err, _strerror(err))
self.socket = ssl.wrap_socket (self.socket, do_handshake_on_connect = False)
self.handshaking = True
try:
self.socket.do_handshake ()
except ssl.SSLError as why:
if why.args[0] in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
return # retry handshake
raise ssl.SSLError(why)
# handshaking done
self.handle_connect()
self.connected = True
def _do_tls_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except OSError as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self.tls_active = True
self.tls_starting = False
def _do_tls_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except OSError as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self.tls_active = True
self.tls_starting = False
def _send_discovery_request(self, ssl_sock, thing_name):
request = self.REQUEST_TYPE_PREFIX + \
self.PAYLOAD_PREFIX + \
thing_name + \
self.PAYLOAD_SUFFIX
self._logger.debug("Sending discover request: " + request)
start_time = time.time()
desired_length_to_write = len(request)
actual_length_written = 0
while True:
try:
length_written = ssl_sock.write(request.encode("utf-8"))
actual_length_written += length_written
except socket.error as err:
if err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE:
pass
if actual_length_written == desired_length_to_write:
return self.LOW_LEVEL_RC_COMPLETE
if start_time + self._timeout_sec < time.time():
return self.LOW_LEVEL_RC_TIMEOUT
def _receive_until(self, ssl_sock, criteria_function, extra_data=None):
start_time = time.time()
response = bytearray()
number_bytes_read = 0
while True: # Python does not have do-while
try:
response.append(self._convert_to_int_py3(ssl_sock.read(1)))
number_bytes_read += 1
except socket.error as err:
if err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE:
pass
if criteria_function((number_bytes_read, response, extra_data)):
return self.LOW_LEVEL_RC_COMPLETE, response
if start_time + self._timeout_sec < time.time():
return self.LOW_LEVEL_RC_TIMEOUT, response
def handle_read(self):
try:
while True:
buf = self.recv(self.in_buffer_size)
self._iobuf.write(buf)
if len(buf) < self.in_buffer_size:
break
except socket.error as err:
if ssl and isinstance(err, ssl.SSLError):
if err.args[0] not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
self.defunct(err)
return
elif err.args[0] not in NONBLOCKING:
self.defunct(err)
return
if self._iobuf.tell():
self.process_io_buffer()
if not self._requests and not self.is_control_connection:
self._readable = False
def handle_read(self, watcher, revents, errno=None):
if revents & libev.EV_ERROR:
if errno:
exc = IOError(errno, os.strerror(errno))
else:
exc = Exception("libev reported an error")
self.defunct(exc)
return
try:
while True:
buf = self._socket.recv(self.in_buffer_size)
self._iobuf.write(buf)
if len(buf) < self.in_buffer_size:
break
except socket.error as err:
if ssl and isinstance(err, ssl.SSLError):
if err.args[0] not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
self.defunct(err)
return
elif err.args[0] not in NONBLOCKING:
self.defunct(err)
return
if self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
def write_to_fd(self, data):
try:
return self.socket.send(data)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
# In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
# the socket is not writeable; we need to transform this into
# an EWOULDBLOCK socket.error or a zero return value,
# either of which will be recognized by the caller of this
# method. Prior to Python 3.5, an unwriteable socket would
# simply return 0 bytes written.
return 0
raise
def write_to_fd(self, data):
try:
return self.socket.send(data)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
# In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
# the socket is not writeable; we need to transform this into
# an EWOULDBLOCK socket.error or a zero return value,
# either of which will be recognized by the caller of this
# method. Prior to Python 3.5, an unwriteable socket would
# simply return 0 bytes written.
return 0
raise
def handshake (self):
if not self._handshaking:
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise OSError(err, asyncore._strerror(err))
ssl_context = create_urllib3_context(ssl_version=resolve_ssl_version(None), cert_reqs=resolve_cert_reqs(None))
if self.ac_negotiate_http2:
try: ssl_context.set_alpn_protocols (H2_PROTOCOLS)
except AttributeError: ssl_context.set_npn_protocols (H2_PROTOCOLS)
self.socket = ssl_context.wrap_socket (self.socket, do_handshake_on_connect = False, server_hostname = self.address [0])
self._handshaking = True
try:
self.socket.do_handshake ()
except ssl.SSLError as why:
if why.args [0] in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
return False
raise ssl.SSLError(why)
try: self._proto = self.socket.selected_alpn_protocol()
except (AttributeError, NotImplementedError):
try: self._proto = self.socket.selected_npn_protocol()
except (AttributeError, NotImplementedError): pass
self._handshaked = True
return True
def send (self, data):
self.event_time = time.time ()
try:
return self.socket.send(data)
except ssl.SSLError as why:
if why.errno == ssl.SSL_ERROR_WANT_WRITE:
return 0
else:
raise
def read(self, numberOfBytesToBeBuffered):
if not self._bufferingInProgress: # If last read is completed...
self._remainedLength = numberOfBytesToBeBuffered
self._bufferingInProgress = True # Now we start buffering a new length of bytes
while self._remainedLength > 0: # Read in a loop, always try to read in the remained length
# If the data is temporarily not available, socket.error will be raised and catched by paho
dataChunk = self._sslSocket.read(self._remainedLength)
self._internalBuffer.extend(dataChunk) # Buffer the data
self._remainedLength -= len(dataChunk) # Update the remained length
# The requested length of bytes is buffered, recover the context and return it
# Otherwise error should be raised
ret = self._internalBuffer
self._reset()
return ret # This should always be bytearray
# This is the internal class that sends requested data out chunk by chunk according
# to the availablity of the socket write operation. If the requested bytes of data
# (after encoding) needs to be sent out in separate socket write operations (most
# probably be interrupted by the error socket.error (errno = ssl.SSL_ERROR_WANT_WRITE).)
# , the write pointer is stored to ensure that the continued bytes will be sent next
# time this function gets called.
# *Error handling:
# For retry errors (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE, EAGAIN),
# leave them to the paho _packet_read for further handling (ignored and try
# again when data is available.
# For other errors, leave them to the paho _packet_read for error reporting.
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False
def send(self, data):
try:
return super(SSLConnection, self).send(data)
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return 0
raise
def recv(self, buffer_size):
try:
return super(SSLConnection, self).recv(buffer_size)
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return b''
if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
self.handle_close()
return b''
raise
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError, err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error, err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False
def send(self, data):
try:
return super(SSLConnection, self).send(data)
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return 0
raise
def recv(self, buffer_size):
try:
return super(SSLConnection, self).recv(buffer_size)
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return b''
if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
self.handle_close()
return b''
raise
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError, err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error, err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False
def send(self, data):
try:
return super(SSLConnection, self).send(data)
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return 0
raise
def recv(self, buffer_size):
try:
return super(SSLConnection, self).recv(buffer_size)
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return b''
if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
self.handle_close()
return b''
raise
def read(self, numberOfBytesToBeBuffered):
if not self._bufferingInProgress: # If last read is completed...
self._remainedLength = numberOfBytesToBeBuffered
self._bufferingInProgress = True # Now we start buffering a new length of bytes
while self._remainedLength > 0: # Read in a loop, always try to read in the remained length
# If the data is temporarily not available, socket.error will be raised and catched by paho
dataChunk = self._sslSocket.read(self._remainedLength)
self._internalBuffer.extend(dataChunk) # Buffer the data
self._remainedLength -= len(dataChunk) # Update the remained length
# The requested length of bytes is buffered, recover the context and return it
# Otherwise error should be raised
ret = self._internalBuffer
self._reset()
return ret
# This is the internal class that sends requested data out chunk by chunk according
# to the availablity of the socket write operation. If the requested bytes of data
# (after encoding) needs to be sent out in separate socket write operations (most
# probably be interrupted by the error socket.error (errno = ssl.SSL_ERROR_WANT_WRITE).)
# , the write pointer is stored to ensure that the continued bytes will be sent next
# time this function gets called.
# *Error handling:
# For retry errors (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE, EAGAIN),
# leave them to the paho _packet_read for further handling (ignored and try
# again when data is available.
# For other errors, leave them to the paho _packet_read for error reporting.
def _handShake(self, hostAddress, portNumber):
CRLF = "\r\n"
hostAddressChunks = hostAddress.split('.') # <randomString>.iot.<region>.amazonaws.com
region = hostAddressChunks[2] # XXXX.<region>.beta
signedURL = self._sigV4Handler.createWebsocketEndpoint(hostAddress, portNumber, region, "GET", "iotdata", "/mqtt")
if signedURL == "":
raise wssNoKeyInEnvironmentError()
# Now we got a signedURL
path = signedURL[signedURL.index("/mqtt"):]
# Assemble HTTP request headers
Method = "GET " + path + " HTTP/1.1" + CRLF
Host = "Host: " + hostAddress + CRLF
Connection = "Connection: " + "Upgrade" + CRLF
Upgrade = "Upgrade: " + "websocket" + CRLF
SecWebSocketVersion = "Sec-WebSocket-Version: " + "13" + CRLF
rawSecWebSocketKey = self._generateWSSKey()
SecWebSocketKey = "sec-websocket-key: " + rawSecWebSocketKey + CRLF # Should be randomly generated...
SecWebSocketProtocol = "Sec-WebSocket-Protocol: " + "mqttv3.1" + CRLF
SecWebSocketExtensions = "Sec-WebSocket-Extensions: " + "permessage-deflate; client_max_window_bits" + CRLF
# Send the HTTP request
self._sslSocket.write(Method + Host + Connection + Upgrade + SecWebSocketVersion + SecWebSocketProtocol + SecWebSocketExtensions + SecWebSocketKey + CRLF)
# Read it back (Non-blocking socket)
# Do we need a timeout here?
wssHandshakeResponse = ""
while len(wssHandshakeResponse) == 0:
try:
wssHandshakeResponse += self._sslSocket.read(1024) # Response is always less than 1024 bytes
except socket.error as err:
if err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE:
pass
# Verify response
if not self._verifyWSSResponse(wssHandshakeResponse, rawSecWebSocketKey):
raise wssHandShakeError()
else:
pass
# Used to create a single wss frame
# Assume that the maximum length of a MQTT packet never exceeds the maximum length
# for a wss frame. Therefore, the FIN bit for the encoded frame will always be 1.
# Frames are encoded as BINARY frames.
def write_to_fd(self, data):
try:
return self.socket.send(data)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
# In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
# the socket is not writeable; we need to transform this into
# an EWOULDBLOCK socket.error or a zero return value,
# either of which will be recognized by the caller of this
# method. Prior to Python 3.5, an unwriteable socket would
# simply return 0 bytes written.
return 0
raise
def _do_ssl_handshake(self):
try:
self.socket.do_handshake()
except ssl.SSLError as err:
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
ssl.SSL_ERROR_WANT_WRITE):
return
elif err.args[0] == ssl.SSL_ERROR_EOF:
return self.handle_close()
raise
except socket.error as err:
if err.args[0] == errno.ECONNABORTED:
return self.handle_close()
else:
self._ssl_accepting = False