def find_port(self):
# Finds the serial port names
if sys.platform.startswith('win'):
ports = ['COM%s' % (i+1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
ports = glob.glob('/dev/ttyUSB*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.usbserial*')
else:
raise EnvironmentError('Error finding ports on your operating system')
openbci_port = ''
for port in ports:
try:
s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
s.write(b'v')
openbci_serial = self.openbci_id(s)
s.close()
if openbci_serial:
openbci_port = port;
except (OSError, serial.SerialException):
pass
if openbci_port == '':
raise OSError('Cannot find OpenBCI port')
else:
return openbci_port
python类SerialException()的实例源码
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
# XXX would be nice if the writer could be interrupted at this
# point... to exit completely
raise
def serial_class_for_url(url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'alt':
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
class_name = 'Serial'
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'class':
class_name = values[0]
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'spy':
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
# process options now, directly altering self
formatter = FormatHexdump
color = False
output = sys.stderr
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'file':
output = open(values[0], 'w')
elif option == 'color':
color = True
elif option == 'raw':
formatter = FormatRaw
elif option == 'all':
self.show_all = True
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
self.formatter = formatter(output, color)
return ''.join([parts.netloc, parts.path])
def serial_class_for_url(url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'alt':
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
class_name = 'Serial'
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'class':
class_name = values[0]
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'spy':
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
# process options now, directly altering self
formatter = FormatHexdump
color = False
output = sys.stderr
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'file':
output = open(values[0], 'w')
elif option == 'color':
color = True
elif option == 'raw':
formatter = FormatRaw
elif option == 'all':
self.show_all = True
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
self.formatter = formatter(output, color)
return ''.join([parts.netloc, parts.path])
def setDeviceID(dev, id):
if id < 0 or id > 255:
raise ValueError("ID must be an unsigned byte!")
try:
com = serial.Serial(dev, timeout=5)
packet = DriverSerial._generateHeader(CMDTYPE.SETID, 1)
packet.append(id)
com.write(packet)
resp = com.read(1)
if len(resp) == 0:
DriverSerial._comError()
else:
if ord(resp) != RETURN_CODES.SUCCESS:
DriverSerial._printError(ord(resp))
except serial.SerialException:
log.error("Problem connecting to serial device.")
raise IOError("Problem connecting to serial device.")
def dump_port_settings(self):
"""Write current settings to sys.stderr"""
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def dump_port_settings(self):
"""Write current settings to sys.stderr"""
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def serial_ports():
if sys.platform.startswith('win'): # windows COM port enumerations
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): # linux and cygwin support
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
elif sys.platform.startswith('darwin'): # OSX support
ports = glob.glob('/dev/tty.*')
else:
raise EnvironmentError('Unsupported platform')
result = []
for port in ports: # for every possible port name
try:
s = serial.Serial(port) # open it and close it
s.close()
result.append(port) # if nothing went wrong, the port exists on the system
except (OSError, serial.SerialException): # if something went wrong, the port does not exist
pass
return result # return the list of ports that currently actually exist on the system
def serial_class_for_url(url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'alt':
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
class_name = 'Serial'
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'class':
class_name = values[0]
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
if parts.scheme != 'spy':
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
# process options now, directly altering self
formatter = FormatHexdump
color = False
output = sys.stderr
try:
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'file':
output = open(values[0], 'w')
elif option == 'color':
color = True
elif option == 'raw':
formatter = FormatRaw
elif option == 'all':
self.show_all = True
else:
raise ValueError('unknown option: %r' % (option,))
except ValueError as e:
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
self.formatter = formatter(output, color)
return ''.join([parts.netloc, parts.path])
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
# XXX would be nice if the writer could be interrupted at this
# point... to exit completely
raise
def __init__(self):
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.*')
else:
raise EnvironmentError('Unsupported platform')
self.result = []
for port in ports:
try:
s = serial.Serial(port)
s.close()
self.result.append(port)
except (OSError, serial.SerialException):
pass
def connect(self, port_name='/dev/ttyACM0', baud_rate=9600):
self.serial_connection = None
try:
self.serial_connection = serial.Serial(
port=port_name,
baudrate=baud_rate
)
except serial.SerialException as e:
print "Could not connect", e
time.sleep(2) # no idea why but robot does not move if speed commands are sent
# directly after opening serial
while self.current_status is None:
print "Waiting for status message"
time.sleep(1)
print "received first status message with position", self.current_status.position
def findPorts():
result=[]
temp_list = glob.glob('/dev/ttyA[A-Za-z]*')
for a_port in temp_list:
try:
s = serial.Serial(a_port)
s.close()
result.append(a_port)
except serial.SerialException:
pass
temp_list = glob.glob('/dev/ttyUSB[A-Za-z]*')
for a_port in temp_list:
try:
s = serial.Serial(a_port)
s.close()
result.append(a_port)
except serial.SerialException:
pass
return result
def run(self):
logger.debug("Serial listening thread started")
incoming = ''
while True:
try:
incoming = self.Listen2.readline().rstrip() # use [:-1]?
except serial.SerialException as e:
logger.debug("Serial exception occured: " + str(e))
if not self.Stop:
logger.warning("serial exception ocurred: " + str(e))
break
if self.Stop:
break
else:
if is_hex_string(incoming):
Send2ParentThread(self.Network, incoming).start()
else:
logger.debug("Serial port said: " + str(incoming))
logger.info("Serial listening thread shutting down")
def connect_serial_gps():
"""
Check if the device is available. If its not, disconnect any currently running gpsd using killall.
Check availability again. If still not available, raise an error. When device is available, connect
to the L80GPS object.
If device does not become available after killall on gpsd, another process such as kismet_server may be
forcing it to remain connected.
:return: L80GPS object
"""
device = get_device_gps()
if not device_available(device):
cmd = 'sudo killall gpsd'
os.system(cmd)
#rpi_utils.run_program(cmd)
time.sleep(1)
if not device_available(device):
raise serial.SerialException('Device busy ' + device + '. Check your processes.')
gps = l80gps.L80GPS()
return gps
def _setup_connection(self, device: str, speed: int):
''' todo: doc '''
try:
self._conn = serial.Serial(device, speed, timeout=2)
except serial.SerialException as ex:
if ex.errno == 2:
raise device_error("there's no device '%s'" % device)
raise
self.disconnect()
self._conn.flush()
# todo: refactor
_conn_settings = self._conn.getSettingsDict()
for _ in range(2):
_conn_settings['rtscts'] = not _conn_settings['rtscts']
self._conn.applySettingsDict(_conn_settings)
# connection._assert_token(connection._read_byte(self._conn), _token.SYNC)
self._handler_thread = threading.Thread(target=self._handler_thread_fn, daemon=True)
self._handler_thread.start()
LOG.info('connection initialized')
def find_port(self):
try:
temp_port_list = serial.tools.list_ports.comports()
except OSError:
raise OSError('Serial port not found! Try entering your port manually.')
ports = [i[0] for i in temp_port_list][::-1]
openbci_port = ''
for port in ports:
try:
s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
s.write(b'v')
openbci_serial = self.openbci_id(s)
s.close()
if openbci_serial:
openbci_port = port;
except (OSError, serial.SerialException):
pass
if openbci_port == '':
raise OSError('Cannot find OpenBCI port')
else:
return openbci_port
def _connectSerial(self):
"""connect via serial interface"""
#DeviceID = {"Globalsat 580p" : "0483:5740",
# "Timex Cycle Trainer" : "0484:5741"
# }
DeviceID = {"Globalsat 580p" : "0483:5740"}
# search for the port first based on device ids
if self.port is None:
for key, value in DeviceID.iteritems():
ports = list(list_ports.grep(value))
if len(ports) > 0:
self.port = ports[0][0]
self.logger.debug("USB virtual serial port found on " + self.port)
break
# didnt find anything fall back to config.ini
if self.port is None:
self.port = self.config.get("serial", "comport")
self.logger.debug("Virtual serial port not found. Reverting to config.ini: " + self.port)
try:
self.serial = serial.Serial(port=self.port, baudrate=self.config.get("serial", "baudrate"), timeout=self.config.getint("serial", "timeout"), xonxoff=0, rtscts=1)
self.logger.debug("serial connection on " + self.serial.portstr)
except serial.SerialException:
self.logger.critical("error establishing serial connection")
raise GB500SerialException
def dump_port_settings(self):
"""Write current settings to sys.stderr"""
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def connect(self, port = 'COM22', speed = 115200):
if self.serial != None:
self.close()
try:
self.serial = Serial(str(port), speed, timeout=1, writeTimeout=10000)
except SerialException as e:
raise ispBase.IspError("Failed to open serial port")
except:
raise ispBase.IspError("Unexpected error while connecting to serial port:" + port + ":" + str(sys.exc_info()[0]))
self.seq = 1
#Reset the controller
self.serial.setDTR(1)
time.sleep(0.1)
self.serial.setDTR(0)
time.sleep(0.2)
self.sendMessage([1])
if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]:
self.close()
raise ispBase.IspError("Failed to enter programming mode")
def write(self, data):
"""Write some data to the transport.
This method does not block; it buffers the data and arranges
for it to be sent out asynchronously. Writes made after the
transport has been closed will be ignored."""
if self._closing:
return
if self.get_write_buffer_size() == 0:
# Attempt to send it right away first
try:
n = self._serial.write(data)
except serial.SerialException as exc:
self._fatal_error(exc, 'Fatal write error on serial transport')
return
if n == len(data):
return # Whole request satisfied
assert n > 0
data = data[n:]
self._ensure_writer()
self._write_buffer.append(data)
self._maybe_pause_protocol()
def dump_port_settings(self):
"""Write current settings to sys.stderr"""
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
('active' if self.serial.rts else 'inactive'),
('active' if self.serial.dtr else 'inactive'),
('active' if self.serial.break_condition else 'inactive')))
try:
sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
('active' if self.serial.cts else 'inactive'),
('active' if self.serial.dsr else 'inactive'),
('active' if self.serial.ri else 'inactive'),
('active' if self.serial.cd else 'inactive')))
except serial.SerialException:
# on RFC 2217 ports, it can happen if no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def run(self):
while not self.exit_event.is_set():
try:
bytes_array = bytearray(self.serial_file.read(1))
except serial.SerialException:
time.sleep(rate)
continue
if not bytes_array:
time.sleep(rate)
continue
byte = bytes_array[0]
with serial_lock:
try:
order = Order(byte)
except ValueError:
continue
if order == Order.RECEIVED:
n_received_semaphore.release()
decodeOrder(self.serial_file, byte)
time.sleep(rate)
print("Listener Thread Exited")