def __init__(self):
self.ser = None
try:
self.ser = serial.Serial(self.DEVICE, self.BAUD_RATE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=.01)
except serial.serialutil.SerialException, err:
raise CannotLoadSensor("Cannot load sensor: %s" % str(err))
except Exception as err:
raise CannotLoadSensor("Cannot load sensor: %s" % str(err))
python类PARITY_NONE的实例源码
def setupSerial(devName, baudrate):
serialDev = serial.Serial(port=devName, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE)
if serialDev is None:
raise RuntimeError('[%s]: Serial port %s not found!\n' % (rospy.get_name(), devDame))
serialDev.write_timeout = .05
serialDev.timeout= 0.05
serialDev.flushOutput()
serialDev.flushInput()
return serialDev
def setupSerial(devName, baudrate):
serialDev = serial.Serial(port=devName, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE)
if serialDev is None:
raise RuntimeError('[%s]: Serial port %s not found!\n' % (rospy.get_name(), devDame))
serialDev.write_timeout = .05
serialDev.timeout= 0.05
serialDev.flushOutput()
serialDev.flushInput()
return serialDev
def test():
PORT = '/dev/tty.usbmodem1412'
BAUD = 115200
def get_serial():
s = serial.Serial(PORT)
s.baudrate = BAUD
s.parity = serial.PARITY_NONE
s.databits = serial.EIGHTBITS
s.stopbits = serial.STOPBITS_ONE
s.timeout = 0 # non blocking mode
s.close()
s.port = PORT
s.open()
return s
s = get_serial()
repl = REPL(s)
repl.to_raw()
repl.send_command("print('hello')")
print(repl.wait_response())
repl.send_command("a=1")
repl.wait_response()
repl.send_command("print(a)")
print(repl.wait_response())
# FINISHED
s.close()
def get_serial():
s = serial.Serial(PORT)
s.baudrate = BAUD
s.parity = serial.PARITY_NONE
s.databits = serial.EIGHTBITS
s.stopbits = serial.STOPBITS_ONE
s.timeout = 0 # non blocking mode
s.close()
s.port = PORT
s.open()
return s
def __init__(self):
existing_ports = SerialInterface.serial_ports()
if len(existing_ports) > 1:
print('WARN! more than one serial port is active on the computer. Blindly using first one...')
if len(existing_ports) is 0:
print('ERR! no serial ports found on the computer! Exiting...')
exit(-1)
print('using port: ' + existing_ports[0])
port_name = existing_ports[0]
self.ser = serial.Serial(port=port_name, baudrate=9600, bytesize=BYTE_SIZE, parity=serial.PARITY_NONE,
stopbits=STOP_BITS, write_timeout=10, timeout=10)
self.sampler = Sampler()
def __init__(self, serial_name):
self.port = serial_name
self.baudrate = 9600 # Default baud rate
self.timeout = 1 # Default timeout, seconds
self.parity = serial.PARITY_NONE # Default parity
self.stopbits = serial.STOPBITS_ONE # Default stop bits
self.bytesize = serial.EIGHTBITS
self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port
self.max_voltage = 80.0 # Volts
self.max_current = 10.0 # Amps
self.max_power = 320.0 # Watts
def __init__(self, serial_name):
""" Initialize the power supply.
Each children an here define non-default values for his specific case
Input : serial_name, String, is the serial port name (e.g. COM2)
"""
self.name = "Generic Power Supply"
self.port = serial_name
self.baudrate = 9600 # Default baud rate
""" Possible baudrate values :
50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200,
230400, 460800, 500000, 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000
"""
self.timeout = 1 # Default timeout, seconds
self.parity = serial.PARITY_NONE # Default parity
""" Possible parities :
PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE
"""
self.stopbits = serial.STOPBITS_ONE # Default stop bits
""" Possible stopbits :
STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO
"""
self.bytesize = serial.EIGHTBITS
""" Possible bytesizes :
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS
"""
self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port
self.max_voltage = 0.0 # Volts
self.max_current = 0.0 # Amps
self.max_power = 0.0 # Watts
def connect(self):
"""
Open tty connection to sensor
"""
if self.link is not None:
self.disconnect()
self.link = serial.Serial(self.port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, dsrdtr=True, timeout=5, interCharTimeout=0.1)
def GoodFETSerialPort(**kwargs):
"Return a Serial port using default values possibly overriden by caller"
import serial
port = os.environ.get('GOODFET') or "/dev/ttyUSB0"
args = dict(port=port, baudrate=115200,
parity=serial.PARITY_NONE, timeout=2)
args.update(kwargs)
return serial.Serial(**args)
def main(args):
global sock
global ser
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind(("0.0.0.0",UDPport))
ser = serial.Serial(serport,baudrate,timeout=0,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=8,rtscts=False,xonxoff=False)
data = sock.recv(10000)
print "start loop, listen-port %u" % UDPport
print "sending to %s" % serport
try:
while(data):
## #print "UDP data %s" % repr(data)[:55]
proc_input(data)
data = sock.recv(10000)
except socket.timeout:
sock.close()
return 0
# Eine Zeile Input verarbeiten: Auf 5-bit kuerzen, RGB Komponenten umordnen, alles an tty senden.
def __init__(self, device='/dev/ttyAMA0', baud=115200,
parity_=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, time_out=0):
# ACK header for data recive
self.Header = bytearray([0x00, 0x00, 0xff, 0x00, 0xff, 0x00, 0x00, 0x00, 0xff])
# time out.
if time_out != 0:
self.connectUART = serial.Serial(port=device, baudrate=baud, stopbits=stop_bits, bytesize=byte_size,
timeout=time_out)
else:
self.connectUART = serial.Serial(port=device, baudrate=baud, stopbits=stop_bits, bytesize=byte_size)
# manually close connection
def __init__(self, device='/dev/ttyAMA0', baud=115200,
parity_=serial.PARITY_NONE, stop_bits=serial.STOPBITS_ONE, byte_size=serial.EIGHTBITS, time_out=0):
super(PN532, self).__init__(device, baud, parity_, stop_bits, byte_size, time_out)
def __init__(self, port='/dev/serial0'):
self.ser = serial.Serial()
self.ser.port = port
self.ser.baudrate = 115200
self.ser.bytesize = serial.EIGHTBITS
self.ser.parity = serial.PARITY_NONE
self.ser.stopbits = serial.STOPBITS_ONE
self.open_serial()
self.rc_data = [0] * N_CHAN
def __init__(self, port="/dev/ttyACM0", bitrate=115200,
rx_protocol=msp.RX_MSP, firmware=msp.FIRMWARE_BF):
assert rx_protocol in msp.RX_OPTIONS, (
"unsupported rx protocol indicated")
self.rx_protocol = rx_protocol
self.rx_protocol_ch_count = len(
MSP_PAYLOAD_FMT[msp.MSP_RC][rx_protocol][1:])
assert firmware in msp.FIRMWARE_OPTIONS, (
"unsupported firmware indicated")
self.firmware = firmware
self.firmware_motor_count = len(
MSP_PAYLOAD_FMT[msp.MSP_MOTOR][firmware][1:])
self.ser = serial.Serial()
self.ser.port = port
self.ser.baudrate = bitrate
self.ser.bytesize = serial.EIGHTBITS
self.ser.parity = serial.PARITY_NONE
self.ser.stopbits = serial.STOPBITS_ONE
self.ser.timeout = None
self.ser.xonxoff = False
self.ser.rtscts = False
self.ser.dsrdtr = False
self.ser.writeTimeout = 2
self.open_serial()
def MakeSerialDevice(port="/dev/ttyUSB0"):
dev = serial.Serial(
port=port,
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
# blocking
timeout=5)
# dev.open()
return dev
def __init__(self,
serialdevice,
configuration,
power_control=DTR_ON,
scan_interval=0):
"""Initialize the data collector based on the given parameters."""
self.record_length = configuration[RECORD_LENGTH]
self.start_sequence = configuration[STARTBLOCK]
self.byte_order = configuration[BYTE_ORDER]
self.multiplier = configuration[MULTIPLIER]
self.timeout = configuration[TIMEOUT]
self.scan_interval = scan_interval
self.listeners = []
self.power_control = power_control
self.sensordata = {}
self.config = configuration
self.data = None
self.last_poll = None
self.start_func = None
self.stop_func = None
self.ser = serial.Serial(port=serialdevice,
baudrate=configuration[BAUD_RATE],
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS)
# Update date in using a background thread
if self.scan_interval > 0:
thread = threading.Thread(target=self.refresh, args=())
thread.daemon = True
thread.start()
def read_mh_z19_with_temperature(serial_device):
""" Read the CO2 PPM concenration and temperature from a MH-Z19 sensor"""
logger = logging.getLogger(__name__)
ser = serial.Serial(port=serial_device,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS)
sbuf = bytearray()
starttime = time.time()
finished = False
timeout = 2
res = None
ser.write(MZH19_READ)
while not finished:
mytime = time.time()
if mytime - starttime > timeout:
logger.error("read timeout after %s seconds, read %s bytes",
timeout, len(sbuf))
return None
if ser.inWaiting() > 0:
sbuf += ser.read(1)
if len(sbuf) == MHZ19_SIZE:
# TODO: check checksum
res = (sbuf[2]*256 + sbuf[3], sbuf[4] - 40)
logger.debug("Finished reading data %s", sbuf)
finished = True
else:
time.sleep(.1)
logger.debug("Serial waiting for data, buffer length=%s",
len(sbuf))
return res
def __init__(self, protocol, deviceNameOrPortNumber, reactor,
baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE,
stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0):
self._serial = serial.Serial(deviceNameOrPortNumber, baudrate=baudrate,
bytesize=bytesize, parity=parity,
stopbits=stopbits, timeout=None,
xonxoff=xonxoff, rtscts=rtscts)
self.flushInput()
self.flushOutput()
self.reactor = reactor
self.protocol = protocol
self.outQueue = []
self.closed = 0
self.closedNotifies = 0
self.writeInProgress = 0
self.protocol = protocol
self._overlappedRead = win32file.OVERLAPPED()
self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
self._overlappedWrite = win32file.OVERLAPPED()
self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)
self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent')
self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent')
self.protocol.makeConnection(self)
flags, comstat = win32file.ClearCommError(self._serial.hComPort)
rc, self.read_buf = win32file.ReadFile(self._serial.hComPort,
win32file.AllocateReadBuffer(1),
self._overlappedRead)
def __init__(self, protocol, deviceNameOrPortNumber, reactor,
baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE,
stopbits = STOPBITS_ONE, timeout = 0, xonxoff = 0, rtscts = 0):
abstract.FileDescriptor.__init__(self, reactor)
self._serial = serial.Serial(deviceNameOrPortNumber, baudrate = baudrate, bytesize = bytesize, parity = parity, stopbits = stopbits, timeout = timeout, xonxoff = xonxoff, rtscts = rtscts)
self.reactor = reactor
self.flushInput()
self.flushOutput()
self.protocol = protocol
self.protocol.makeConnection(self)
self.startReading()