def write(self, data):
"""Output the given byte string over the serial port."""
if not self.is_open:
raise portNotOpenError
d = to_bytes(data)
tx_len = len(d)
if self._write_timeout is not None and self._write_timeout > 0:
timeout = time.time() + self._write_timeout
else:
timeout = None
while tx_len > 0:
try:
n = os.write(self.fd, d)
if timeout:
# when timeout is set, use select to wait for being ready
# with the time left as timeout
timeleft = timeout - time.time()
if timeleft < 0:
raise writeTimeoutError
_, ready, _ = select.select([], [self.fd], [], timeleft)
if not ready:
raise writeTimeoutError
else:
# wait for write operation
_, ready, _ = select.select([], [self.fd], [], None)
if not ready:
raise SerialException('write failed (select)')
d = d[n:]
tx_len -= n
except SerialException:
raise
except OSError as v:
if v.errno != errno.EAGAIN:
raise SerialException('write failed: %s' % (v,))
# still calculate and check timeout
if timeout and timeout - time.time() < 0:
raise writeTimeoutError
return len(data)
评论列表
文章目录