def __init__(self,port=None,logFileName=False):
print("\n### TENMA MULTIMETER INTERFACE ###")
devList=self.device_list()
self.port=port
self.logFileName=logFileName
if self.logFileName:
self.log_init()
if port is None:
self.port=self.guessPort
if not self.port in devList:
print("!!! %s isn't in the list of serial ports !!!"%self.port)
print("Preparing to use",self.port)
self.ser = serial.Serial()
self.ser.port = self.port
self.ser.baudrate = 19230
self.ser.bytesize = serial.SEVENBITS
self.ser.parity = serial.PARITY_ODD
self.ser.stopbits = serial.STOPBITS_ONE
self.ser.timeout = 3
self.ser.xonxoff = False
self.ser.rtscts = False
self.ser.dsrdtr = False
self.ser.writeTimeout = 3
python类PARITY_ODD的实例源码
def test_init_sets_the_correct_attrs(self):
"""__init__ sets the fields that get_settings reads"""
for setting, value in (
('baudrate', 57600),
('timeout', 7),
('write_timeout', 12),
('inter_byte_timeout', 15),
('stopbits', serial.STOPBITS_TWO),
('bytesize', serial.SEVENBITS),
('parity', serial.PARITY_ODD),
('xonxoff', True),
('rtscts', True),
('dsrdtr', True)):
kwargs = {'do_not_open': True, setting: value}
ser = serial.serial_for_url(PORT, **kwargs)
d = ser.get_settings()
self.assertEqual(getattr(ser, setting), value)
self.assertEqual(d[setting], value)
def log(options):
# configure the serial connections (the parameters differs on the device you are connecting to)
ser = serial.Serial(
port=options.device,
baudrate=9600,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_TWO,
bytesize=serial.SEVENBITS
)
prog = Progress()
with open(options.outfile, 'w') as the_file:
the_file.write('timestamp;value\n')
while True:
# \r\n is for device terminators set to CR LF
ser.write((':FETCh?\r\n'))
# wait one second before reading output.
time.sleep(options.interval)
out = ''
while ser.inWaiting() > 0:
out += ser.read(1)
if out != '':
out = out.rstrip()
res = "%s;%s\n" % (time.time(), float(out))
the_file.write(res)
the_file.flush()
prog.show()
def __init__(self, serial_name):
self.port = serial_name
self.baudrate = 9600 # Default baud rate
self.timeout = 0.1 # Default timeout, seconds
self.parity = serial.PARITY_ODD # Default parity
self.stopbits = serial.STOPBITS_ONE # Default stop bits
self.bytesize = serial.EIGHTBITS
self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port
self.max_voltage = 42.0 # Volts
self.max_current = 6.0 # Amps
self.max_power = 100.0 # Watts
def __init__(self, serial_name):
self.port = serial_name
self.baudrate = 57600 # Default baud rate
self.timeout = 0.1 # Default timeout, seconds
self.parity = serial.PARITY_ODD # Default parity
self.stopbits = serial.STOPBITS_ONE # Default stop bits
self.bytesize = serial.EIGHTBITS
self.ser = serial.Serial(self.port, self.baudrate, self.bytesize, self.parity, self.stopbits, timeout=self.timeout) # serial port
self.max_voltage = 360.0 # Volts
self.max_current = 10.0 # Amps
self.max_power = 1000.0 # Watts
def __init__(self,addr='COM1',ndacs=8,polarity=('BIP','BIP'),verb=True,timeout = 2,reset=False):
#Directly using pyserial interface and skipping pyvisa
self.serialport = serial.Serial(addr,baudrate=115200,bytesize=serial.EIGHTBITS, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_ONE,timeout=timeout)
if ndacs!=8 and ndacs!=16:
print('DAC WARNING, non-standard number of dacs. Should be 8 or 16 but %d was given' % ndacs)
self.ndacs = ndacs
self.verb = verb
self.SetPolarity(polarity)
if reset:
self.RampAllZero(tt=20.)
return
self.lastmessage = ()
#Function to set polarity. This just informs the driver what polarities are in use so it can correctly set the voltages.
#The driver cannot physically set the polarity. The real polarity of the DACs can only be set form the hardware switches.
def __init__(self, tty, timeout=2):
self.conn = serial.Serial(tty,
baudrate=19200,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_ONE,
timeout=timeout,
xonxoff=1,
rtscts=0,
dsrdtr=0
)
# switch on power for hardware RS232
# transmitter via handshake-signals
self.conn.dtr = True
self.conn.rts = False
ThreadlessPulseSpO2Plotter.py 文件源码
项目:CMS50DPlus-PulseOx
作者: jgerschler
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def connect(self):
if self.conn is None:
self.conn = serial.Serial(port = self.port,
baudrate = 19200,
parity = serial.PARITY_ODD,
stopbits = serial.STOPBITS_ONE,
bytesize = serial.EIGHTBITS,
timeout = 5,
xonxoff = 1)
elif not self.is_connected():
self.conn.open()
UNFINISHED CMS50DPlusPulseSpO2.py 文件源码
项目:CMS50DPlus-PulseOx
作者: jgerschler
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def connect(self):
if self.conn is None:
self.conn = serial.Serial(port = self.port,
baudrate = 19200,
parity = serial.PARITY_ODD,
stopbits = serial.STOPBITS_ONE,
bytesize = serial.EIGHTBITS,
timeout = 5,
xonxoff = 1)
elif not self.is_connected():
self.conn.open()
def connect(self):
if self.conn is None:
self.conn = serial.Serial(port = self.port,
baudrate = 19200,
parity = serial.PARITY_ODD,
stopbits = serial.STOPBITS_ONE,
bytesize = serial.EIGHTBITS,
timeout = 5,
xonxoff = 1)
elif not self.is_connected():
self.conn.open()
def connect(self):
if self.conn is None:
self.conn = serial.Serial(port = self.port,
baudrate = 19200,
parity = serial.PARITY_ODD,
stopbits = serial.STOPBITS_ONE,
bytesize = serial.EIGHTBITS,
timeout = 5,
xonxoff = 1)
elif not self.is_connected():
self.conn.open()
def __init__(self, port):
self.ser = serial.Serial(
port=port,
baudrate=baud_rate,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
self.ser.write(b'\xF6')
res = self.ser.read(2)
if res == b'e\xfb':
click.secho('nejemojo is go', fg='green')
else:
click.secho('bad mojo: {}'.format(res), fg='red')
def test_ParitySetting(self):
for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
self.s.parity = parity
# test get method
self.assertEqual(self.s.parity, parity)
# test internals
self.assertEqual(self.s._parity, parity)
# test illegal values
for illegal_value in (0, 57, 'a', None):
self.assertRaises(ValueError, setattr, self.s, 'parity', illegal_value)
def __init__(self,
port: str,
baudrate: int,
bytesize: int,
stopbits: float,
parity: str,
timeout: float,
xonxoff: bool,
rtscts: bool):
"""Converts data from JSON format to serial.Serial format."""
self._port = port
self._baudrate = baudrate
self._bytesize = {
5: serial.FIVEBITS,
6: serial.SIXBITS,
7: serial.SEVENBITS,
8: serial.EIGHTBITS
}[bytesize]
self._stopbits = {
1: serial.STOPBITS_ONE,
1.5: serial.STOPBITS_ONE_POINT_FIVE,
2: serial.STOPBITS_TWO
}[stopbits]
self._parity = {
'none': serial.PARITY_NONE,
'even': serial.PARITY_EVEN,
'odd': serial.PARITY_ODD,
'mark': serial.PARITY_MARK,
'space': serial.PARITY_SPACE
}[parity]
self._timeout = timeout
self._xonxoff = xonxoff
self._rtscts = rtscts
def test_ParitySetting(self):
for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
self.s.parity = parity
# test get method
self.failUnlessEqual(self.s.parity, parity)
# test internals
self.failUnlessEqual(self.s._parity, parity)
# test illegal values
for illegal_value in (0, 57, 'a', None):
self.failUnlessRaises(ValueError, self.s.setParity, illegal_value)
def serial_hook(self, comm_instance, port, baudrate, connection_timeout, *args, **kwargs):
if port == 'VIRTUAL':
return None
if port is None or port == 'AUTO':
# no known port, try auto detection
comm_instance._changeState(comm_instance.STATE_DETECT_SERIAL)
serial_obj = comm_instance._detectPort(False)
if serial_obj is None:
comm_instance._log("Failed to autodetect serial port")
comm_instance._errorValue = 'Failed to autodetect serial port.'
comm_instance._changeState(comm_instance.STATE_ERROR)
eventManager().fire(Events.ERROR, {"error": comm_instance.getErrorString()})
return None
else:
# connect to regular serial port
comm_instance._log("Connecting to: %s" % port)
if baudrate == 0:
serial_obj = serial.Serial(str(port), 115200, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD)
else:
serial_obj = serial.Serial(str(port), baudrate, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD)
serial_obj.close()
serial_obj.parity = serial.PARITY_NONE
serial_obj.open()
self._logger.info("########################################")
self._logger.info("Restarting Marlin")
#Reset the controller
serial_obj.setDTR(1)
time.sleep(0.1)
serial_obj.setDTR(0)
time.sleep(0.2)
self._logger.info("Marlin Reset")
#Flush input and output
self._logger.info("Flushing Input and Output!")
serial_obj.flushOutput()
serial_obj.flushInput()
self._logger.info("Finished Flushing")
#write something to the serial line to get rid of any bad characters in the buffer
self._logger.info("Writing M105")
first_write = "M105\n"
serial_obj.write(first_write)
self._logger.info("########################################")
return serial_obj
def serial_hook(self, comm_instance, port, baudrate, connection_timeout, *args, **kwargs):
if port is None or port == 'AUTO':
# no known port, try auto detection
comm_instance._changeState(comm_instance.STATE_DETECT_SERIAL)
serial_obj = comm_instance._detectPort(False)
if serial_obj is None:
comm_instance._log("Failed to autodetect serial port")
comm_instance._errorValue = 'Failed to autodetect serial port.'
comm_instance._changeState(comm_instance.STATE_ERROR)
eventManager().fire(Events.ERROR, {"error": comm_instance.getErrorString()})
return None
else:
# connect to regular serial port
comm_instance._log("Connecting to: %s" % port)
if baudrate == 0:
serial_obj = serial.Serial(str(port), 115200, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD)
else:
serial_obj = serial.Serial(str(port), baudrate, timeout=connection_timeout, writeTimeout=10000, parity=serial.PARITY_ODD)
serial_obj.close()
serial_obj.parity = serial.PARITY_NONE
serial_obj.open()
self._logger.info("########################################")
self._logger.info("Restarting Marlin")
#Reset the controller
serial_obj.setDTR(1)
time.sleep(0.1)
serial_obj.setDTR(0)
time.sleep(0.2)
self._logger.info("Marlin Reset")
#Flush input and output
self._logger.info("Flushing Input and Output!")
serial_obj.flushOutput()
serial_obj.flushInput()
self._logger.info("Finished Flushing")
#write something to the serial line to get rid of any bad characters in the buffer
self._logger.info("Writing M105")
first_write = "M105\n"
serial_obj.write(first_write)
self._logger.info("########################################")
return serial_obj
def openSerial(self):
#Set up the relationship between what the user enters and what the API calls for
bytedic = {'5':serial.FIVEBITS,
'6':serial.SIXBITS,
'7':serial.SEVENBITS,
'8':serial.EIGHTBITS}
bytesize = bytedic[str(self.root.variables['databits'])]
paritydict = {'None':serial.PARITY_NONE,
'Even':serial.PARITY_EVEN,
'Odd' :serial.PARITY_ODD,
'Mark':serial.PARITY_MARK,
'Space':serial.PARITY_SPACE}
parity=paritydict[self.root.variables['parity']]
stopbitsdict = {'1':serial.STOPBITS_ONE,
'2':serial.STOPBITS_TWO}
stopbits = stopbitsdict[str(self.root.variables['stopbits'])]
#Open the serial port given the settings, store under the root
if os.name == 'nt':
port = self.root.variables['COMport'][0:5].strip()
self.root.ser = serial.Serial(\
port=port,\
baudrate=str(self.root.variables['baud']),\
bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0.5)
else:
first_space = self.root.variables['COMport'].index(' ')
port = self.root.variables['COMport'][0:first_space].strip()
# Parameters necessary due to https://github.com/pyserial/pyserial/issues/59
self.root.ser = serial.Serial(\
port=port,\
baudrate=str(self.root.variables['baud']),\
bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=0.5, rtscts=True, dsrdtr=True)
io.DEFAULT_BUFFER_SIZE = 5000
#Purge the buffer of any previous data
if float(serial.VERSION[0:3]) < 3:
#If we're executing with pySerial 2.x
serial.Serial.flushInput(self.root.ser)
serial.Serial.flushOutput(self.root.ser)
else:
#Otherwise we're using pySerial 3.x
serial.Serial.reset_input_buffer(self.root.ser)
serial.Serial.reset_output_buffer(self.root.ser)
def predict_result():
ser = serial.Serial( # ????????????
port='COM6',
baudrate=1200,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_TWO,
bytesize=serial.SEVENBITS
)
serial_data = []
plt.xlim(0, 100)
plt.ylim(300, 700)
plt.title('GSR')
plt.ion()
i = 0
j = 0
id = 0
while True:
line = ser.readline()
line = int(line)
serial_data.append(line)
if i > 100:
plt.xlim(i - 100, i)
plt.plot(serial_data)
i += 1
j += 1
if j >= 50:
clf = joblib.load('model\\happy_model.m')
select = joblib.load('model\\vector_select.m')
vector = getattr.get_vector(serial_data)
new_vector = select.transform(vector)
print(new_vector)
result = clf.predict(new_vector)
if result[0] == '2':
clf = joblib.load('model\\sad_model.m')
result = clf.predict(new_vector)
j = 0
plt.plot([i, i], [300, 700], 'r--')
if result[0] == '1':
plt.annotate('happy', xy=(i, 600), xytext=(i - 10, 600), arrowprops=dict(facecolor='red', shrink=0.05))
res = 1
database.insert(id, res)
elif result[0] == '2':
plt.annotate('normal', xy=(i, 600), xytext=(i - 10, 600), arrowprops=dict(facecolor='blue', shrink=0.05))
res = 0
database.insert(id, res)
else:
plt.annotate('sad', xy=(i, 600), xytext=(i - 10, 600),arrowprops=dict(facecolor='black', shrink=0.05))
res = 2
database.insert(id, res)
print(result)
id += 1
plt.pause(0.001)