python类SerialException()的实例源码

open_bci_v3.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def find_port(self):
    # Finds the serial port names
    if sys.platform.startswith('win'):
      ports = ['COM%s' % (i+1) for i in range(256)]
    elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
      ports = glob.glob('/dev/ttyUSB*')
    elif sys.platform.startswith('darwin'):
      ports = glob.glob('/dev/tty.usbserial*')
    else:
      raise EnvironmentError('Error finding ports on your operating system')
    openbci_port = ''
    for port in ports:
      try:
        s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
        s.write(b'v')
        openbci_serial = self.openbci_id(s)
        s.close()
        if openbci_serial:
          openbci_port = port;
      except (OSError, serial.SerialException):
        pass
    if openbci_port == '':
      raise OSError('Cannot find OpenBCI port')
    else:
      return openbci_port
miniterm.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reader(self):
        """loop and copy serial->console"""
        try:
            while self.alive and self._reader_alive:
                # read all that is there or wait for one byte
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    if self.raw:
                        self.console.write_bytes(data)
                    else:
                        text = self.rx_decoder.decode(data)
                        for transformation in self.rx_transformations:
                            text = transformation.rx(text)
                        self.console.write(text)
        except serial.SerialException:
            self.alive = False
            # XXX would be nice if the writer could be interrupted at this
            #     point... to exit completely
            raise
protocol_alt.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_spy.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
protocol_alt.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_spy.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
serial_driver.py 文件源码 项目:BiblioPixel2 作者: ManiacalLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setDeviceID(dev, id):
        if id < 0 or id > 255:
            raise ValueError("ID must be an unsigned byte!")

        try:
            com = serial.Serial(dev, timeout=5)

            packet = DriverSerial._generateHeader(CMDTYPE.SETID, 1)
            packet.append(id)
            com.write(packet)

            resp = com.read(1)
            if len(resp) == 0:
                DriverSerial._comError()
            else:
                if ord(resp) != RETURN_CODES.SUCCESS:
                    DriverSerial._printError(ord(resp))

        except serial.SerialException:
            log.error("Problem connecting to serial device.")
            raise IOError("Problem connecting to serial device.")
miniterm.py 文件源码 项目:gcodeplot 作者: arpruss 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def dump_port_settings(self):
        """Write current settings to sys.stderr"""
        sys.stderr.write("\n--- Settings: {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
            p=self.serial))
        sys.stderr.write('--- RTS: {:8}  DTR: {:8}  BREAK: {:8}\n'.format(
            ('active' if self.serial.rts else 'inactive'),
            ('active' if self.serial.dtr else 'inactive'),
            ('active' if self.serial.break_condition else 'inactive')))
        try:
            sys.stderr.write('--- CTS: {:8}  DSR: {:8}  RI: {:8}  CD: {:8}\n'.format(
                ('active' if self.serial.cts else 'inactive'),
                ('active' if self.serial.dsr else 'inactive'),
                ('active' if self.serial.ri else 'inactive'),
                ('active' if self.serial.cd else 'inactive')))
        except serial.SerialException:
            # on RFC 2217 ports, it can happen if no modem state notification was
            # yet received. ignore this error.
            pass
        sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
        sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
        sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
        sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
        sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
        sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
miniterm.py 文件源码 项目:gcodeplot 作者: arpruss 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reader(self):
        """loop and copy serial->console"""
        try:
            while self.alive and self._reader_alive:
                # read all that is there or wait for one byte
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    if self.raw:
                        self.console.write_bytes(data)
                    else:
                        text = self.rx_decoder.decode(data)
                        for transformation in self.rx_transformations:
                            text = transformation.rx(text)
                        self.console.write(text)
        except serial.SerialException:
            self.alive = False
            self.console.cancel()
            raise       # XXX handle instead of re-raise?
miniterm.py 文件源码 项目:bitio 作者: whaleygeek 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dump_port_settings(self):
        """Write current settings to sys.stderr"""
        sys.stderr.write("\n--- Settings: {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
            p=self.serial))
        sys.stderr.write('--- RTS: {:8}  DTR: {:8}  BREAK: {:8}\n'.format(
            ('active' if self.serial.rts else 'inactive'),
            ('active' if self.serial.dtr else 'inactive'),
            ('active' if self.serial.break_condition else 'inactive')))
        try:
            sys.stderr.write('--- CTS: {:8}  DSR: {:8}  RI: {:8}  CD: {:8}\n'.format(
                ('active' if self.serial.cts else 'inactive'),
                ('active' if self.serial.dsr else 'inactive'),
                ('active' if self.serial.ri else 'inactive'),
                ('active' if self.serial.cd else 'inactive')))
        except serial.SerialException:
            # on RFC 2217 ports, it can happen if no modem state notification was
            # yet received. ignore this error.
            pass
        sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
        sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
        sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
        sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
        sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
        sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
miniterm.py 文件源码 项目:bitio 作者: whaleygeek 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reader(self):
        """loop and copy serial->console"""
        try:
            while self.alive and self._reader_alive:
                # read all that is there or wait for one byte
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    if self.raw:
                        self.console.write_bytes(data)
                    else:
                        text = self.rx_decoder.decode(data)
                        for transformation in self.rx_transformations:
                            text = transformation.rx(text)
                        self.console.write(text)
        except serial.SerialException:
            self.alive = False
            self.console.cancel()
            raise       # XXX handle instead of re-raise?
serial_interface.py 文件源码 项目:sequelspeare 作者: raidancampbell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def serial_ports():
        if sys.platform.startswith('win'):  # windows COM port enumerations
            ports = ['COM%s' % (i + 1) for i in range(256)]
        elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):  # linux and cygwin support
            # this excludes your current terminal "/dev/tty"
            ports = glob.glob('/dev/tty[A-Za-z]*')
        elif sys.platform.startswith('darwin'):  # OSX support
            ports = glob.glob('/dev/tty.*')
        else:
            raise EnvironmentError('Unsupported platform')

        result = []
        for port in ports:  # for every possible port name
            try:
                s = serial.Serial(port)  # open it and close it
                s.close()
                result.append(port)  # if nothing went wrong, the port exists on the system
            except (OSError, serial.SerialException):  # if something went wrong, the port does not exist
                pass
        return result  # return the list of ports that currently actually exist on the system
protocol_alt.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_spy.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
miniterm.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def reader(self):
        """loop and copy serial->console"""
        try:
            while self.alive and self._reader_alive:
                # read all that is there or wait for one byte
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    if self.raw:
                        self.console.write_bytes(data)
                    else:
                        text = self.rx_decoder.decode(data)
                        for transformation in self.rx_transformations:
                            text = transformation.rx(text)
                        self.console.write(text)
        except serial.SerialException:
            self.alive = False
            # XXX would be nice if the writer could be interrupted at this
            #     point... to exit completely
            raise
nfc_.py 文件源码 项目:nfc_py 作者: X286 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self):
        if sys.platform.startswith('win'):
             ports = ['COM%s' % (i + 1) for i in range(256)]
        elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
            # this excludes your current terminal "/dev/tty"
            ports = glob.glob('/dev/tty[A-Za-z]*')
        elif sys.platform.startswith('darwin'):
            ports = glob.glob('/dev/tty.*')
        else:
            raise EnvironmentError('Unsupported platform')
        self.result = []
        for port in ports:
            try:
                s = serial.Serial(port)
                s.close()
                self.result.append(port)
            except (OSError, serial.SerialException):
                pass
SerialInterface.py 文件源码 项目:robotics1project 作者: pchorak 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def connect(self, port_name='/dev/ttyACM0', baud_rate=9600):
        self.serial_connection = None
        try:
            self.serial_connection = serial.Serial(
                port=port_name,
                baudrate=baud_rate
            )
        except serial.SerialException as e:
            print "Could not connect", e

        time.sleep(2)  # no idea why but robot does not move if speed commands are sent
        # directly after opening serial

        while self.current_status is None:
            print "Waiting for status message"
            time.sleep(1)
        print "received first status message with position", self.current_status.position
find_ports.py 文件源码 项目:threespace_ros 作者: AssistiveRoboticsUNH 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def findPorts():
    result=[]
    temp_list = glob.glob('/dev/ttyA[A-Za-z]*')
    for a_port in temp_list:
        try:
            s = serial.Serial(a_port)
            s.close()
            result.append(a_port)
        except serial.SerialException:
            pass
    temp_list = glob.glob('/dev/ttyUSB[A-Za-z]*')
    for a_port in temp_list:
        try:
            s = serial.Serial(a_port)
            s.close()
            result.append(a_port)
        except serial.SerialException:
            pass
    return result
moteino.py 文件源码 项目:moteinopy 作者: Steinarr134 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        logger.debug("Serial listening thread started")
        incoming = ''
        while True:
            try:
                incoming = self.Listen2.readline().rstrip()  # use [:-1]?
            except serial.SerialException as e:
                logger.debug("Serial exception occured: " + str(e))
                if not self.Stop:
                    logger.warning("serial exception ocurred: " + str(e))
                    break
            if self.Stop:
                break
            else:
                if is_hex_string(incoming):
                    Send2ParentThread(self.Network, incoming).start()
                else:
                    logger.debug("Serial port said: " + str(incoming))
        logger.info("Serial listening thread shutting down")
gps_hardware_tx_rx.py 文件源码 项目:RaspberryPi-projects 作者: gary-dalton 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def connect_serial_gps():
    """
    Check if the device is available. If its not, disconnect any currently running gpsd using killall.
    Check availability again. If still not available, raise an error. When device is available, connect
    to the L80GPS object.
    If device does not become available after killall on gpsd, another process such as kismet_server may be
    forcing it to remain connected.
    :return: L80GPS object
    """
    device = get_device_gps()
    if not device_available(device):
        cmd = 'sudo killall gpsd'
        os.system(cmd)
        #rpi_utils.run_program(cmd)
        time.sleep(1)
        if not device_available(device):
            raise serial.SerialException('Device busy ' + device + '. Check your processes.')
    gps = l80gps.L80GPS()
    return gps
__init__.py 文件源码 项目:pymindwave 作者: frans-fuerst 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _setup_connection(self, device: str, speed: int):
        ''' todo: doc '''
        try:
            self._conn = serial.Serial(device, speed, timeout=2)
        except serial.SerialException as ex:
            if ex.errno == 2:
                raise device_error("there's no device '%s'" % device)
            raise

        self.disconnect()
        self._conn.flush()

        # todo: refactor
        _conn_settings = self._conn.getSettingsDict()
        for _ in range(2):
            _conn_settings['rtscts'] = not _conn_settings['rtscts']
            self._conn.applySettingsDict(_conn_settings)

#        connection._assert_token(connection._read_byte(self._conn), _token.SYNC)
        self._handler_thread = threading.Thread(target=self._handler_thread_fn, daemon=True)
        self._handler_thread.start()
        LOG.info('connection initialized')
open_bci_v3.py 文件源码 项目:ECG_Respiratory_Monitor 作者: gabrielibagon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def find_port(self):
    try:
      temp_port_list = serial.tools.list_ports.comports()
    except OSError:
      raise OSError('Serial port not found! Try entering your port manually.')
    ports = [i[0] for i in temp_port_list][::-1]

    openbci_port = ''
    for port in ports:
      try:
        s = serial.Serial(port= port, baudrate = self.baudrate, timeout=self.timeout)
        s.write(b'v')
        openbci_serial = self.openbci_id(s)
        s.close()
        if openbci_serial:
          openbci_port = port;
      except (OSError, serial.SerialException):
        pass
    if openbci_port == '':
      raise OSError('Cannot find OpenBCI port')
    else:
      return openbci_port
gb580.py 文件源码 项目:CycloTrain 作者: spcmnspff99 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _connectSerial(self):
        """connect via serial interface"""
        #DeviceID = {"Globalsat 580p"          :   "0483:5740",
        #            "Timex Cycle Trainer"     :   "0484:5741"
        #            }
        DeviceID = {"Globalsat 580p"          :   "0483:5740"}
        # search for the port first based on device ids
        if self.port is None:
            for key, value in DeviceID.iteritems():
                ports = list(list_ports.grep(value))
                if len(ports) > 0:
                    self.port = ports[0][0]
                    self.logger.debug("USB virtual serial port found on " + self.port)
                    break
        # didnt find anything fall back to config.ini
        if self.port is None:
            self.port = self.config.get("serial", "comport") 
            self.logger.debug("Virtual serial port not found. Reverting to config.ini: " + self.port)
        try:
            self.serial = serial.Serial(port=self.port, baudrate=self.config.get("serial", "baudrate"), timeout=self.config.getint("serial", "timeout"), xonxoff=0, rtscts=1)
            self.logger.debug("serial connection on " + self.serial.portstr)
        except serial.SerialException:
            self.logger.critical("error establishing serial connection")
            raise GB500SerialException
miniterm.py 文件源码 项目:ddt4all 作者: cedricp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def dump_port_settings(self):
        """Write current settings to sys.stderr"""
        sys.stderr.write("\n--- Settings: {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
            p=self.serial))
        sys.stderr.write('--- RTS: {:8}  DTR: {:8}  BREAK: {:8}\n'.format(
            ('active' if self.serial.rts else 'inactive'),
            ('active' if self.serial.dtr else 'inactive'),
            ('active' if self.serial.break_condition else 'inactive')))
        try:
            sys.stderr.write('--- CTS: {:8}  DSR: {:8}  RI: {:8}  CD: {:8}\n'.format(
                ('active' if self.serial.cts else 'inactive'),
                ('active' if self.serial.dsr else 'inactive'),
                ('active' if self.serial.ri else 'inactive'),
                ('active' if self.serial.cd else 'inactive')))
        except serial.SerialException:
            # on RFC 2217 ports, it can happen if no modem state notification was
            # yet received. ignore this error.
            pass
        sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
        sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
        sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
        sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
        sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
        sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
miniterm.py 文件源码 项目:ddt4all 作者: cedricp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reader(self):
        """loop and copy serial->console"""
        try:
            while self.alive and self._reader_alive:
                # read all that is there or wait for one byte
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    if self.raw:
                        self.console.write_bytes(data)
                    else:
                        text = self.rx_decoder.decode(data)
                        for transformation in self.rx_transformations:
                            text = transformation.rx(text)
                        self.console.write(text)
        except serial.SerialException:
            self.alive = False
            self.console.cancel()
            raise       # XXX handle instead of re-raise?
stk500v2.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self, port = 'COM22', speed = 115200):
        if self.serial != None:
            self.close()
        try:
            self.serial = Serial(str(port), speed, timeout=1, writeTimeout=10000)
        except SerialException as e:
            raise ispBase.IspError("Failed to open serial port")
        except:
            raise ispBase.IspError("Unexpected error while connecting to serial port:" + port + ":" + str(sys.exc_info()[0]))
        self.seq = 1

        #Reset the controller
        self.serial.setDTR(1)
        time.sleep(0.1)
        self.serial.setDTR(0)
        time.sleep(0.2)

        self.sendMessage([1])
        if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]:
            self.close()
            raise ispBase.IspError("Failed to enter programming mode")
aio.py 文件源码 项目:mt7687-serial-uploader 作者: will127534 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def write(self, data):
        """Write some data to the transport.

        This method does not block; it buffers the data and arranges
        for it to be sent out asynchronously.  Writes made after the
        transport has been closed will be ignored."""
        if self._closing:
            return

        if self.get_write_buffer_size() == 0:
            # Attempt to send it right away first
            try:
                n = self._serial.write(data)
            except serial.SerialException as exc:
                self._fatal_error(exc, 'Fatal write error on serial transport')
                return
            if n == len(data):
                return  # Whole request satisfied
            assert n > 0
            data = data[n:]
            self._ensure_writer()

        self._write_buffer.append(data)
        self._maybe_pause_protocol()
miniterm.py 文件源码 项目:mt7687-serial-uploader 作者: will127534 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def dump_port_settings(self):
        """Write current settings to sys.stderr"""
        sys.stderr.write("\n--- Settings: {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
            p=self.serial))
        sys.stderr.write('--- RTS: {:8}  DTR: {:8}  BREAK: {:8}\n'.format(
            ('active' if self.serial.rts else 'inactive'),
            ('active' if self.serial.dtr else 'inactive'),
            ('active' if self.serial.break_condition else 'inactive')))
        try:
            sys.stderr.write('--- CTS: {:8}  DSR: {:8}  RI: {:8}  CD: {:8}\n'.format(
                ('active' if self.serial.cts else 'inactive'),
                ('active' if self.serial.dsr else 'inactive'),
                ('active' if self.serial.ri else 'inactive'),
                ('active' if self.serial.cd else 'inactive')))
        except serial.SerialException:
            # on RFC 2217 ports, it can happen if no modem state notification was
            # yet received. ignore this error.
            pass
        sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
        sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
        sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
        sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
        sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
        sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
miniterm.py 文件源码 项目:mt7687-serial-uploader 作者: will127534 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reader(self):
        """loop and copy serial->console"""
        try:
            while self.alive and self._reader_alive:
                # read all that is there or wait for one byte
                data = self.serial.read(self.serial.in_waiting or 1)
                if data:
                    if self.raw:
                        self.console.write_bytes(data)
                    else:
                        text = self.rx_decoder.decode(data)
                        for transformation in self.rx_transformations:
                            text = transformation.rx(text)
                        self.console.write(text)
        except serial.SerialException:
            self.alive = False
            self.console.cancel()
            raise       # XXX handle instead of re-raise?
common.py 文件源码 项目:RacingRobot 作者: sergionr2 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        while not self.exit_event.is_set():
            try:
                bytes_array = bytearray(self.serial_file.read(1))
            except serial.SerialException:
                time.sleep(rate)
                continue
            if not bytes_array:
                time.sleep(rate)
                continue
            byte = bytes_array[0]
            with serial_lock:
                try:
                    order = Order(byte)
                except ValueError:
                    continue
                if order == Order.RECEIVED:
                    n_received_semaphore.release()
                decodeOrder(self.serial_file, byte)
            time.sleep(rate)
        print("Listener Thread Exited")


问题


面经


文章

微信
公众号

扫码关注公众号