def spOpen(serialName):
# 115200/8/N/1
try:
if serialName:
serialFd = serial.Serial(serialName, timeout = 3)
serialFd.baudrate = 115200
#serialFd.baudrate = 9600
serialFd.bytesize = 8
serialFd.parity = serial.PARITY_NONE;
serialFd.stopbits = 1
serialFd.timeout = 3
serialFd.writeTimeout = 1
if serialFd and serialFd.readable():
return serialFd
else:
return None
except serial.SerialException:
return None
python类SerialException()的实例源码
def find_port(self):
try:
temp_port_list = serial.tools.list_ports.comports()
except OSError:
raise OSError('Serial port not found! Try entering your port manually.')
ports = [i[0] for i in temp_port_list][::-1]
openbci_port = ''
for port in ports:
try:
s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
s.write(b'v')
openbci_serial = self.openbci_id(s)
s.close()
if openbci_serial:
openbci_port = port;
except (OSError, serial.SerialException):
pass
if openbci_port == '':
raise OSError('Cannot find OpenBCI port')
else:
return openbci_port
def connect(self):
"""
????? ??????????? ? ??????????.
"""
if not self.connected:
self.serial.port = self.port
if not self.serial.isOpen():
try:
self.serial.open()
except serial.SerialException as exc:
raise excepts.NoConnectionError(
u'?? ??????? ??????? ???? {} ({})'.format(
self.port, exc
)
)
for r in self.check(self.CHECK_NUM, True):
if r:
self.connected = True
return
else:
self.serial.close()
raise excepts.NoConnectionError()
def dump_port_settings(self):
"""Write current settings to sys.stderr"""
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def dump_port_settings(self):
"""Write current settings to sys.stderr"""
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def listen(self):
eol = b'\n'
eol_len = len(eol)
line = bytearray()
while True:
try:
c = self.serial.read(1)
if c:
line += c
if line[-eol_len:] == eol:
print(line)
sample = struct.unpack('<' + 'h'*(len(line[:-eol_len])//2), line[:-eol_len])
print(sample)
self.emitter.emit(sample[-2:])
line = bytearray()
except SerialException:
return
except AttributeError:
return
except struct.error:
continue
def setPort(self, port:Port):
if port.isOpen():
port.close()
port.setBackground(QtGui.QColor(0, 0, 0, 0))
else:
try:
port.open(self.baudrate_combo.currentText(),
self.parity_combo.currentText(),
self.bytesize_combo.currentText())
port.setBackground(QtGui.QColor(0, 255, 0, 127))
except SerialException as e:
print(e)
port.setBackground(QtGui.QColor(255, 0, 0, 127))
self.check_availability_list.append(port)
except ValueError as e:
print(e)
port.setBackground(QtGui.QColor(255, 0, 0, 127))
port.setSelected(False)
print(self.getPortList())
def __init__(self, port=None):
"""Opens a connection to a Removinator controller on the specified
serial port.
If a port is not specified, an attempt will be made to auto-discover
the Removinator controller.
:param port: port that the Removinator is connected to
:type port: str
:raises: :exc:`ConnectError`
:ivar last_result: The result output from the last executed command
:ivar last_response: The response output from the last executed command
:ivar port: The port that the Removinator is connected to
"""
self.port = port
self.last_result = ''
self.last_response = ''
# Attempt to discover the Removinator controller if a
# port was not specified.
if port is None:
port = _discover_removinator()
# Open a connection to the Removinator controller.
try:
self.connection = serial.Serial(
port,
9600,
serial.EIGHTBITS,
serial.PARITY_NONE,
serial.STOPBITS_ONE,
1)
self.sio = io.TextIOWrapper(io.BufferedRWPair(self.connection,
self.connection))
except serial.SerialException as e:
raise ConnectError('Unable to open connection to Removinator '
'controller on port {0}'.format(port))
def __init__(self, port, baud, delay):
self.port = port
self.baud = baud
self.serial = None
self.delay = delay
try:
self.serial = serial.Serial(port, baud)
except serial.SerialException as e:
raise TransportError(e.strerror)
self.serial.timeout = 3
self.serial.interCharTimeout = 3
def __init__(self, port, baud_rate, terminal=None, reset=False):
Connection.__init__(self, terminal)
self._port = port
self._baud_rate = baud_rate
try:
self._serial = serial.Serial(None, self._baud_rate, timeout=0, write_timeout=0.2)
self._serial.dtr = False
self._serial.rts = False
self._serial.port = port
self._serial.open()
if reset:
self._serial.rts = True
time.sleep(0.1)
self._serial.rts = False
x = ""
while not x.endswith(">>>"):
x += self._serial.read().decode('utf-8', errors="ignore")
self.send_kill()
except (OSError, serial.SerialException) as e:
self._serial = None
return
except Exception as e:
return
self._reader_thread = Thread(target=self._reader_thread_routine)
self._reader_thread.start()
def _serial_ports(with_wifi):
""" Lists serial port names
:raises EnvironmentError:
On unsupported or unknown platforms
:returns:
A list of the serial ports available on the system
"""
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.*')
else:
raise EnvironmentError('Unsupported platform')
result = []
for port in ports:
try:
s = serial.Serial()
s.dtr = False
s.rts = False
s.port = port
s.open()
s.close()
result.append(port)
except (OSError, serial.SerialException):
pass
if with_wifi:
result.append("wifi")
return result
def _flash_finished(self, code):
Logger.log("Flash output contents:\r\n")
Logger.log(self._flash_output)
if code == 0:
self._flash_output.extend(b"Rebooting from flash mode...\n")
self._update_output()
try:
s = serial.Serial(self._port, 115200)
s.dtr = False
s.rts = True
time.sleep(0.1)
s.rts = False
time.sleep(0.1)
self._flash_output.extend(b"Done, you may now use the device.\n")
self._update_output()
except (OSError, serial.SerialException):
QMessageBox.critical(self, "Flashing Error", "Failed to reboot into working mode.")
elif code == -1:
QMessageBox.critical(self, "Flashing Error", "Failed to run script.\nCheck that path to python is correct.")
else:
QMessageBox.critical(self, "Flashing Error", "Failed to flash new firmware")
self.eraseButton.setEnabled(True)
self.flashButton.setEnabled(True)
self._flashing = False
def open(self):
"""\
Open port with current settings. This may throw a SerialException
if the port cannot be opened."""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self.fd = None
# open
try:
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
self.reset_input_buffer()
def write(self, data):
"""Output the given byte string over the serial port."""
if not self.is_open:
raise portNotOpenError
d = to_bytes(data)
tx_len = len(d)
if self._write_timeout is not None and self._write_timeout > 0:
timeout = time.time() + self._write_timeout
else:
timeout = None
while tx_len > 0:
try:
n = os.write(self.fd, d)
if timeout:
# when timeout is set, use select to wait for being ready
# with the time left as timeout
timeleft = timeout - time.time()
if timeleft < 0:
raise writeTimeoutError
_, ready, _ = select.select([], [self.fd], [], timeleft)
if not ready:
raise writeTimeoutError
else:
# wait for write operation
_, ready, _ = select.select([], [self.fd], [], None)
if not ready:
raise SerialException('write failed (select)')
d = d[n:]
tx_len -= n
except SerialException:
raise
except OSError as v:
if v.errno != errno.EAGAIN:
raise SerialException('write failed: %s' % (v,))
# still calculate and check timeout
if timeout and timeout - time.time() < 0:
raise writeTimeoutError
return len(data)
def read(self, size=1):
"""\
Read size bytes from the serial port. If a timeout is set it may
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
if self.fd is None:
raise portNotOpenError
read = bytearray()
poll = select.poll()
poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
if size > 0:
while len(read) < size:
# print "\tread(): size",size, "have", len(read) #debug
# wait until device becomes ready to read (or something fails)
for fd, event in poll.poll(self._timeout * 1000):
if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
raise SerialException('device reports error (poll)')
# we don't care if it is select.POLLIN or timeout, that's
# handled below
buf = os.read(self.fd, size - len(read))
read.extend(buf)
if ((self._timeout is not None and self._timeout >= 0) or
(self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf:
break # early abort on timeout
return bytes(read)
def _reconfigure_port(self, force_update=True):
"""Set communication parameters on opened port."""
super(VTIMESerial, self)._reconfigure_port()
fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK
if self._inter_byte_timeout is not None:
vmin = 1
vtime = int(self._inter_byte_timeout * 10)
else:
vmin = 0
vtime = int(self._timeout * 10)
try:
orig_attr = termios.tcgetattr(self.fd)
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here
raise serial.SerialException("Could not configure port: %s" % msg)
if vtime < 0 or vtime > 255:
raise ValueError('Invalid vtime: %r' % vtime)
cc[termios.VTIME] = vtime
cc[termios.VMIN] = vmin
termios.tcsetattr(
self.fd,
termios.TCSANOW,
[iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
def dump_port_settings(self):
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
#~ sys.stderr.write('--- data escaping: %s linefeed: %s\n' % (
#~ REPR_MODES[self.repr_mode],
#~ LF_MODES[self.convert_outgoing]))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def __init__(self, port, baud, delay):
self.port = port
self.baud = baud
self.serial = None
self.delay = delay
try:
self.serial = serial.Serial(port, baud)
except serial.SerialException as e:
raise TransportError(e.strerror)
self.serial.timeout = 3
self.serial.interCharTimeout = 3