def __init__(self, data_q, error_q, port, port_baud,
port_stopbits=serial.STOPBITS_ONE,
port_parity=serial.PARITY_NONE, port_timeout = 0.01,
listener=False, virtual=False, infer_limit=15,
encoding="UTF-8", delimiter=r"\s"):
threading.Thread.__init__(self)
self.port = port
self.serial_port = None
self.serial_arg = dict(port = port,
baudrate = int(port_baud),
stopbits = float(port_stopbits),
parity = port_parity,
timeout = float(port_timeout))
if virtual:
msg.std("Running in virtual serial port mode.", 2)
self.serial_arg["dsrdtr"] = True
self.serial_arg["rtscts"] = True
self.data_q = data_q
self.error_q = error_q
self.listener = listener
self.encoding = encoding
import re
self.delimiter = delimiter
global rxdelims
rxdelims[port] = re.compile(delimiter)
self.sensors = {}
self._manual_sensors = False
"""bool: when True, the sensors list was constructed manually using a
configuration file; otherwise, it was inferred from the first few lines
of data from the serial port.
"""
if infer_limit is not None:
self.inferrer = FormatInferrer(infer_limit)
else: # pragma: no cover
self.inferrer = None
self.alive = threading.Event()
self.alive.set()
评论列表
文章目录