def __checkType__(self, serial):
byte = serial.read().decode("utf-8")
print(byte)
if byte == "1":
print("??? ???? ?? : " + serial.port)
serial.write(b'0')
self.threads["Sensor"] = SensorThread(serial)
return self.threads["Sensor"]
elif byte == "2" :
print("??? ???? ?? : " + serial.port)
serial.write(b'0')
self.threads["Control"] = ControlThread(serial)
return self.threads["Control"]
else :
print("? ? ?? ???? : " + serial.port)
serial.write(b'1')
return None
#?? ???
python类write()的实例源码
def write(self, value):
print(value)
if(value == 0):
serial.write('0')
elif(value == 1):
serial.write('1')
elif(value == 2):
serial.write('2')
elif(value == 3):
serial.write('3')
elif(value == 4):
serial.write('4')
# Disconnects the Arduino
def showReady(self):
serial.write('6')
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): %s" % (e,))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnetSendOption(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))
def rfc2217SendSubnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
# - check modem lines, needs to be called periodically from user to
# establish polling
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): %s" % (e,))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnetSendOption(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))
def rfc2217SendSubnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
# - check modem lines, needs to be called periodically from user to
# establish polling
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): {}".format(e))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnet_send_option(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))
def rfc2217_send_subnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
# - check modem lines, needs to be called periodically from user to
# establish polling
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): {}".format(e))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnet_send_option(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(IAC + action + option)
def rfc2217_send_subnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE)
# - check modem lines, needs to be called periodically from user to
# establish polling
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): %s" % (e,))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnetSendOption(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))
def rfc2217SendSubnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
# - check modem lines, needs to be called periodically from user to
# establish polling
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): {}".format(e))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnet_send_option(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))
def rfc2217_send_subnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
# - check modem lines, needs to be called periodically from user to
# establish polling
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
if not self.is_open:
raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
except socket.error as e:
raise SerialException("connection failed (socket error): {}".format(e))
return len(data)
def _internal_raw_write(self, data):
"""internal socket write with no data escaping. used to send telnet stuff."""
with self._write_lock:
self._socket.sendall(data)
def telnet_send_option(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))