def __init__(self, device):
self.ser = None
self.ser = serial.Serial(device,
baudrate=921600, # 1152000,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
timeout=0.1,
xonxoff=False,
rtscts=False,
dsrdtr=False)
self._connect = False
self._lock()
self._speed_up()
self.reset_input_buffer = self.ser.reset_input_buffer
self.reset_output_buffer = self.ser.reset_output_buffer
self.write = self.ser.write
self.read = self.ser.read
self.flush = self.ser.flush
python类PARITY_EVEN的实例源码
def get_dsmr_connection_parameters():
""" Returns the communication settings required for the DSMR version set. """
DSMR_VERSION_MAPPING = {
DataloggerSettings.DSMR_VERSION_2: {
'baudrate': 9600,
'bytesize': serial.SEVENBITS,
'parity': serial.PARITY_EVEN,
'crc': False,
},
DataloggerSettings.DSMR_VERSION_4_PLUS: {
'baudrate': 115200,
'bytesize': serial.EIGHTBITS,
'parity': serial.PARITY_NONE,
'crc': True,
},
}
datalogger_settings = DataloggerSettings.get_solo()
connection_parameters = DSMR_VERSION_MAPPING[datalogger_settings.dsmr_version]
connection_parameters['com_port'] = datalogger_settings.com_port
return connection_parameters
def __init__(self,
path,
baudrate=9600,
timeout=1,
open_on_create=True,
debugging=False):
super(TeliumNativeSerial, self).__init__(
path,
baudrate=baudrate,
bytesize=SEVENBITS,
parity=PARITY_EVEN,
stopbits=STOPBITS_ONE,
timeout=timeout,
open_on_create=open_on_create,
debugging=debugging)
def __init__(self, portname, slaveaddress=30):
minimalmodbus.Instrument.__init__(self, portname, slaveaddress)
self.serial.baudrate = 19200
self.serial.parity = serial.PARITY_EVEN
self.serial.timeout = 1.0 #timeout to 1000ms because I discovered roundtrip times as high as 898.5 ms
self.mode = minimalmodbus.MODE_RTU
def connect(
self, baudrate=None, parity=None, databits=None, stopbits=None):
"""Open a serial port for communication with barcode reader device
Connection settings are taken from three possible locations in the
following order:
- method arguments
- object properties set directly or in constructor or
detect_connection_settings()
- default device settings according to device documentation
If connection settings (baud rate, parity, etc.) are neither provided
as arguments nor previously set as object properties, for example by
a search with the detect_connection_settings() method, the default
values specified in the device documentation are used. For example,
page 3-1 of the MS3 user manual specifies:
- baud rate: 9600
- parity: even
- data bits: 7
- stop bits: 1
- flow control: none
"""
baudrate = baudrate or self.baudrate or 9600
parity = parity or self.parity or serial.PARITY_EVEN
bytesize = databits or self.databits or serial.SEVENBITS
stopbits = stopbits or self.stopbits or serial.STOPBITS_ONE
self.port = serial.Serial(
self.portname,
baudrate=baudrate,
parity=parity,
bytesize=bytesize,
stopbits=stopbits,
timeout=1,
xonxoff=False,
rtscts=False,
dsrdtr=False,
)
self._config = self.read_config()
def __init__(self, device='/dev/ttyUSB0', baudrate=9600, rst='-rts', debug=False):
self._sl = serial.Serial(
port = device,
parity = serial.PARITY_EVEN,
bytesize = serial.EIGHTBITS,
stopbits = serial.STOPBITS_TWO,
timeout = 1,
xonxoff = 0,
rtscts = 0,
baudrate = baudrate,
)
self._rst_pin = rst
self._debug = debug
def test_dsmr_version_2(self):
""" Test connection parameters for DSMR v2. """
datalogger_settings = DataloggerSettings.get_solo()
datalogger_settings.dsmr_version = DataloggerSettings.DSMR_VERSION_2
datalogger_settings.save()
self.assertEqual(DataloggerSettings.get_solo().dsmr_version, DataloggerSettings.DSMR_VERSION_2)
connection_parameters = dsmr_datalogger.services.get_dsmr_connection_parameters()
self.assertEqual(connection_parameters['baudrate'], 9600)
self.assertEqual(connection_parameters['bytesize'], serial.SEVENBITS)
self.assertEqual(connection_parameters['parity'], serial.PARITY_EVEN)
def test_ParitySetting(self):
for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
self.s.parity = parity
# test get method
self.assertEqual(self.s.parity, parity)
# test internals
self.assertEqual(self.s._parity, parity)
# test illegal values
for illegal_value in (0, 57, 'a', None):
self.assertRaises(ValueError, setattr, self.s, 'parity', illegal_value)
def __init__(self,
port: str,
baudrate: int,
bytesize: int,
stopbits: float,
parity: str,
timeout: float,
xonxoff: bool,
rtscts: bool):
"""Converts data from JSON format to serial.Serial format."""
self._port = port
self._baudrate = baudrate
self._bytesize = {
5: serial.FIVEBITS,
6: serial.SIXBITS,
7: serial.SEVENBITS,
8: serial.EIGHTBITS
}[bytesize]
self._stopbits = {
1: serial.STOPBITS_ONE,
1.5: serial.STOPBITS_ONE_POINT_FIVE,
2: serial.STOPBITS_TWO
}[stopbits]
self._parity = {
'none': serial.PARITY_NONE,
'even': serial.PARITY_EVEN,
'odd': serial.PARITY_ODD,
'mark': serial.PARITY_MARK,
'space': serial.PARITY_SPACE
}[parity]
self._timeout = timeout
self._xonxoff = xonxoff
self._rtscts = rtscts
def test_ParitySetting(self):
for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
self.s.parity = parity
# test get method
self.failUnlessEqual(self.s.parity, parity)
# test internals
self.failUnlessEqual(self.s._parity, parity)
# test illegal values
for illegal_value in (0, 57, 'a', None):
self.failUnlessRaises(ValueError, self.s.setParity, illegal_value)
def init_port(self):
try:
self._serialPort = serial.Serial(self._COMPort, 57600, timeout=0.5, write_timeout=1, writeTimeout=1, parity=serial.PARITY_EVEN, rtscts=1)
if (self.get_info().find("V0100")) != -1:
self.dprint(1, "Port found: " + self._COMPort)
return 1
else:
self.dprint(1, "Device not found: " + self._COMPort)
self._serialPort.close()
return 0
except Exception as e:
self.dprint(0, 'Error opening port: ' + self._COMPort + str(e))
return 0
def dev_write(self, data):
self.dprint(2, "CMD: " + data + " try: " + str(self._dev_write_try))
try:
self._serialPort.write(data.encode("ISO-8859-1") + b"\r")
except Exception as e:
self.dprint(2, "USBTin ERROR: can't write...")
if self._dev_write_try < 6:
self.dprint(1, "USBTin restart")
try:
self._serialPort.close()
time.sleep(1)
self._serialPort = serial.Serial(self._COMPort, 57600, timeout=0.5, write_timeout=1, writeTimeout=1, parity=serial.PARITY_EVEN, rtscts=1)
time.sleep(1)
self.do_start({})
self._dev_write_try += 1
self.dev_write(data)
except Exception as e2:
self._dev_write_try = 0
self.dev_write("USBTIn ERROR: can't reopen - \n\t" + str(e) + "\n\t" + str(e2))
self.set_error_text("USBTIn ERROR: can't reopen - \n\t" + str(e) + "\n\t" + str(e2))
traceback.print_exc()
else:
self._dev_write_try = 0
self.dev_write("USBTIn ERROR")
self.set_error_text('USBTIn ERROR I/O')
return ""
def init_port(self):
try:
self._serialPort = serial.Serial(self._COMPort, 57600, timeout=0.5, parity=serial.PARITY_EVEN, rtscts=1)
if (self.get_info().find("CANBus Triple")) != -1:
self.dprint(1, "Port found: " + self._COMPort)
return 1
else:
self.dprint(1, "Device not found: " + self._COMPort)
self._serialPort.close()
return 0
except:
self.dprint(0, 'Error opening port: ' + self._COMPort)
return 0
def openSession(self, portname):
self.serialport = serial.Serial(port=portname,
parity=serial.PARITY_EVEN,
bytesize=serial.EIGHTBITS,
stopbits=serial.STOPBITS_TWO,
timeout=1,
xonxoff=0,
rtscts=0,
baudrate=9600)
if (not self.serialport):
return 1
# reset it!
#self.serialport.setRTS(0)
self.serialport.setRTS(1)
self.serialport.setDTR(1)
time.sleep(0.01) # 10ms?
self.serialport.flushInput()
#self.serialport.setRTS(1)
self.serialport.setRTS(0)
self.serialport.setDTR(0)
ts = self.serialport.read()
if ts == None:
return 2 # no card?
if ord(ts) != 0x3B:
return 3 # bad ATR byte
# ok got 0x3B
print "TS: 0x%x Direct convention" % ord(ts)
t0 = chr(0x3B)
while ord(t0) == 0x3b:
t0 = self.serialport.read()
if t0 == None:
return 2
print "T0: 0x%x" % ord(t0)
# read interface bytes
if (ord(t0) & 0x10):
print "TAi = %x" % ord(self.serialport.read())
if (ord(t0) & 0x20):
print "TBi = %x" % ord(self.serialport.read())
if (ord(t0) & 0x40):
print "TCi = %x" % ord(self.serialport.read())
if (ord(t0) & 0x80):
tdi = self.serialport.read()
print "TDi = %x" % ord(tdi)
for i in range(0, ord(t0) & 0xF):
x = self.serialport.read()
print "Historical: %x" % ord(x)
while 1:
x = self.serialport.read()
if (x == ""):
break
print "read: %x" % ord(x)
return 0
def openSerial(self):
#Set up the relationship between what the user enters and what the API calls for
bytedic = {'5':serial.FIVEBITS,
'6':serial.SIXBITS,
'7':serial.SEVENBITS,
'8':serial.EIGHTBITS}
bytesize = bytedic[str(self.root.variables['databits'])]
paritydict = {'None':serial.PARITY_NONE,
'Even':serial.PARITY_EVEN,
'Odd' :serial.PARITY_ODD,
'Mark':serial.PARITY_MARK,
'Space':serial.PARITY_SPACE}
parity=paritydict[self.root.variables['parity']]
stopbitsdict = {'1':serial.STOPBITS_ONE,
'2':serial.STOPBITS_TWO}
stopbits = stopbitsdict[str(self.root.variables['stopbits'])]
#Open the serial port given the settings, store under the root
if os.name == 'nt':
port = self.root.variables['COMport'][0:5].strip()
self.root.ser = serial.Serial(\
port=port,\
baudrate=str(self.root.variables['baud']),\
bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0.5)
else:
first_space = self.root.variables['COMport'].index(' ')
port = self.root.variables['COMport'][0:first_space].strip()
# Parameters necessary due to https://github.com/pyserial/pyserial/issues/59
self.root.ser = serial.Serial(\
port=port,\
baudrate=str(self.root.variables['baud']),\
bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0.5, rtscts=True, dsrdtr=True)
io.DEFAULT_BUFFER_SIZE = 5000
#Purge the buffer of any previous data
if float(serial.VERSION[0:3]) < 3:
#If we're executing with pySerial 2.x
serial.Serial.flushInput(self.root.ser)
serial.Serial.flushOutput(self.root.ser)
else:
#Otherwise we're using pySerial 3.x
serial.Serial.reset_input_buffer(self.root.ser)
serial.Serial.reset_output_buffer(self.root.ser)