def test_readlines(self):
"""Test readlines method"""
self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
self.assertEqual(
self.s.readlines(),
[serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
)
python类to_bytes()的实例源码
def test_xreadlines(self):
"""Test xreadlines method (skipped for io based systems)"""
if hasattr(self.s, 'xreadlines'):
self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
self.assertEqual(
list(self.s.xreadlines()),
[serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
)
def test_for_in(self):
"""Test for line in s"""
self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
lines = []
for line in self.s:
lines.append(line)
self.assertEqual(
lines,
[serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
)
def test_alternate_eol(self):
"""Test readline with alternative eol settings (skipped for io based systems)"""
if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class
self.s.write(serial.to_bytes("no\rno\nyes\r\n"))
self.assertEqual(
self.s.readline(eol=serial.to_bytes("\r\n")),
serial.to_bytes("no\rno\nyes\r\n"))
def hex_encode(data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def hex_encode(data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def encode(self, data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return serial.to_bytes([int(h, 16) for h in data.split()])
def writer(self):
"""loop forever and copy socket->serial"""
while self.alive:
try:
data = self.socket.recv(1024)
if not data:
break
self.serial.write(serial.to_bytes(self.rfc2217.filter(data)))
except socket.error, msg:
self.log.error('%s' % (msg,))
# probably got disconnected
break
self.stop()
def handle_serial_read(self):
"""Reading from serial port"""
try:
data = os.read(self.serial.fileno(), 1024)
if data:
# store data in buffer if there is a client connected
if self.socket is not None:
# escape outgoing data when needed (Telnet IAC (0xff) character)
if self.rfc2217:
data = serial.to_bytes(self.rfc2217.escape(data))
self.buffer_ser2net += data
else:
self.handle_serial_error()
except Exception, msg:
self.handle_serial_error(msg)
def test_readline(self):
"""Test readline method"""
self.s.write(serial.to_bytes("1\n2\n3\n"))
self.failUnlessEqual(self.s.readline(), serial.to_bytes("1\n"))
self.failUnlessEqual(self.s.readline(), serial.to_bytes("2\n"))
self.failUnlessEqual(self.s.readline(), serial.to_bytes("3\n"))
# this time we will get a timeout
self.failUnlessEqual(self.s.readline(), serial.to_bytes(""))
def test_xreadlines(self):
"""Test xreadlines method (skipped for io based systems)"""
if hasattr(self.s, 'xreadlines'):
self.s.write(serial.to_bytes("1\n2\n3\n"))
self.failUnlessEqual(
list(self.s.xreadlines()),
[serial.to_bytes("1\n"), serial.to_bytes("2\n"), serial.to_bytes("3\n")]
)
def test_for_in(self):
"""Test for line in s"""
self.s.write(serial.to_bytes("1\n2\n3\n"))
lines = []
for line in self.s:
lines.append(line)
self.failUnlessEqual(
lines,
[serial.to_bytes("1\n"), serial.to_bytes("2\n"), serial.to_bytes("3\n")]
)
def test_alternate_eol(self):
"""Test readline with alternative eol settings (skipped for io based systems)"""
if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class
self.s.write(serial.to_bytes("no\rno\nyes\r\n"))
self.failUnlessEqual(
self.s.readline(eol=serial.to_bytes("\r\n")),
serial.to_bytes("no\rno\nyes\r\n"))
def yatta_txData(dataBuff):
global yatta_mode , uart
if(yatta_mode == 0):
uart.write(serial.to_bytes(dataBuff))
def initialize_grid(ser, lock):
"""Initialize the Grid by sending "0xC0", expected response is "0x21"
Returns:
- True for successful initialization
- False otherwise
"""
try:
with lock:
# Flush input and output buffers
ser.reset_input_buffer()
ser.reset_output_buffer()
# Write data to serial port to initialize the Grid
bytes_written = ser.write(serial.to_bytes([0xC0]))
# Wait before checking response
time.sleep(WAIT_GRID)
# Read response, one byte = 0x21 is expected for a successful initialization
response = ser.read(size=1)
# Check if the Grid responded with any data
if response:
# Check for correct response (should be 0x21)
if response[0] == int("0x21", 16):
print("Grid initialized")
return True
# Incorrect response received from the grid
else:
helper.show_error("Problem initializing the Grid unit.\n\n"
"Response 0x21 expected, got " + hex(ord(response)) + ".\n\n"
"Please check serial port " + ser.port +".\n")
return False
# In case no response (0 bytes) from the Grid
else:
helper.show_error("Problem initializing the Grid unit.\n\n"
"Response 0x21 expected, no response received.\n\n"
"Please check serial port " + ser.port +".\n")
return False
except Exception as e:
helper.show_error("Problem initializing the Grid unit.\n\n"
"Exception:\n" + str(e) + "\n\n"
"The application will now exit.")
sys.exit(0)
def set_fan(ser, fan, voltage, lock):
"""Sets voltage of a specific fan.
Note:
The Grid only supports voltages between 4.0V and 12.0V in 0.5V steps (e.g. 4.0, 7.5. 12.0)
Configuring "0V" stops a fan.
"""
# Valid voltages and corresponding data (two bytes)
speed_data = {0: [0x00, 0x00], # 0% (Values below 4V is not supported by the Grid, fans will be stopped)
4.0: [0x04, 0x00], # 33.3% (4.0V)
4.5: [0x04, 0x50], # 37.5% (4.5V)
5.0: [0x05, 0x00], # 41.7% (5V)
5.5: [0x05, 0x50], # 45.8% (5.5V)
6.0: [0x06, 0x00], # 50.0% (6V)
6.5: [0x06, 0x50], # 54.2% (6.5V)
7.0: [0x07, 0x00], # 58.3% (7V)
7.5: [0x07, 0x50], # 62.5% (7.5V)
8.0: [0x08, 0x00], # 66.7% (8V)
8.5: [0x08, 0x50], # 70.1% (8.5V)
9.0: [0x09, 0x00], # 75.0% (9.0V)
9.5: [0x09, 0x50], # 79.2% (9.5V)
10.0: [0x0A, 0x00], # 83.3% (10.0V)
10.5: [0x0A, 0x50], # 87.5% (10.5V)
11.0: [0x0B, 0x00], # 91.7% (11.0V)
11.5: [0x0B, 0x50], # 95.8% (11.5V)
12.0: [0x0C, 0x00]} # 100.0% (12.0V)
fan_data = {1: 0x01, # Fan 1
2: 0x02, # Fan 2
3: 0x03, # Fan 3
4: 0x04, # Fan 4
5: 0x05, # Fan 5
6: 0x06} # Fan 6
# Define bytes to be sent to the Grid for configuring a specific fan's voltage
# Format is seven bytes:
# 44 <fan id> C0 00 00 <voltage integer> <voltage decimal>
#
# Example configuring "7.5V" for fan "1":
# 44 01 C0 00 00 07 50
serial_data = [0x44, fan_data[fan], 0xC0, 0x00, 0x00, speed_data[voltage][0], speed_data[voltage][1]]
try:
with lock:
bytes_written = ser.write(serial.to_bytes(serial_data))
time.sleep(WAIT_GRID)
# TODO: Check reponse
# Expected response is one byte
response = ser.read(size=1)
print("Fan " + str(fan) + " updated")
except Exception as e:
helper.show_error("Could not set speed for fan " + str(fan) + ".\n\n"
"Please check settings for serial port " + str(ser.port) + ".\n\n"
"Exception:\n" + str(e) + "\n\n"
"The application will now exit.")
sys.exit(0)
def read_fan_rpm(ser, lock):
"""Reads the current rpm of each fan.
Returns:
- If success: A list with rpm data for each fan
- If failure to read data: An empty list
"""
# List to hold fan rpm data to be returned
fans = []
with lock:
for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]:
try:
# Define bytes to be sent to the Grid for reading rpm for a specific fan
# Format is two bytes:
# 8A <fan id>
serial_data = [0x8A, fan]
ser.reset_output_buffer()
# TODO: Check bytes written
bytes_written = ser.write(serial.to_bytes(serial_data))
# Wait before checking response
time.sleep(WAIT_GRID)
# Expected response is 5 bytes
# Example response: C0 00 00 03 00 = 0x0300 = 768 rpm (two bytes unsigned)
response = ser.read(size=5)
# Check if the Grid responded with any data
if response:
# Check for correct response, first three bytes should be C0 00 00
if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16):
# Convert rpm from 2-bytes unsigned value to decimal
rpm = response[3] * 256 + response[4]
fans.append(rpm)
# An incorrect response was received, return an empty list
else:
return []
# In case no response (0 bytes) received, return an empty list
else:
return []
except Exception as e:
helper.show_error("Could not read rpm for fan " + str(fan) + ".\n\n"
"Please check serial port settings.\n\n"
"Exception:\n" + str(e) + "\n\n"
"The application will now exit.")
print(str(e))
sys.exit(0)
# Fan
return fans
def read_fan_voltage(ser, lock):
"""Reads the current voltage of each fan.
Returns:
- If success: a list with voltage data for each fan
- If failure to read data: An empty list
"""
# List to hold fan voltage data to be returned
fans = []
with lock:
for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]:
try:
# Define bytes to be sent to the Grid for reading voltage for a specific fan
# Format is two bytes, e.g. [0x84, <fan id>]
serial_data = [0x84, fan]
ser.reset_output_buffer()
bytes_written = ser.write(serial.to_bytes(serial_data))
# Wait before checking response
time.sleep(WAIT_GRID)
# Expected response is 5 bytes
# Example response: 00 00 00 0B 01 = 0x0B 0x01 = 11.01 volt
response = ser.read(size=5)
# Check if the Grid responded with any data
if response:
# Check for correct response (first three bytes should be 0x00)
if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16):
# Convert last two bytes to a decimal float value
voltage = float(str(response[3]) + "." + str(response[4]))
# Add the voltage for the current fan to the list
fans.append(voltage)
# An incorrect response was received
else:
print("Error reading fan voltage, incorrect response")
return []
# In case no response (0 bytes) is returned from the Grid
else:
print("Error reading fan voltage, no data returned")
return []
except Exception as e:
helper.show_error("Could not read fan voltage.\n\n"
"Please check serial port " + ser.port + ".\n\n"
"Exception:\n" + str(e) + "\n\n"
"The application will now exit.")
print(str(e))
sys.exit(0)
return fans
def initialize_grid(ser, lock):
"""Initialize the Grid by sending "0xC0", expected response is "0x21"
Returns:
- True for successful initialization
- False otherwise
"""
try:
with lock:
# Flush input and output buffers
ser.reset_input_buffer()
ser.reset_output_buffer()
# Write data to serial port to initialize the Grid
bytes_written = ser.write(serial.to_bytes([0xC0]))
# Wait before checking response
time.sleep(WAIT_GRID)
# Read response, one byte = 0x21 is expected for a successful initialization
response = ser.read(size=1)
# Check if the Grid responded with any data
if response:
# Check for correct response (should be 0x21)
if response[0] == int("0x21", 16):
print("Grid initialized")
return True
# Incorrect response received from the grid
else:
helper.show_error("Problem initializing the Grid unit.\n\n"
"Response 0x21 expected, got " + hex(ord(response)) + ".\n\n"
"Please check serial port " + ser.port +".\n")
return False
# In case no response (0 bytes) from the Grid
else:
helper.show_error("Problem initializing the Grid unit.\n\n"
"Response 0x21 expected, no response received.\n\n"
"Please check serial port " + ser.port +".\n")
return False
except Exception as e:
pass
def set_fan(ser, fan, voltage, lock):
"""Sets voltage of a specific fan.
Note:
The Grid only supports voltages between 4.0V and 12.0V in 0.5V steps (e.g. 4.0, 7.5. 12.0)
Configuring "0V" stops a fan.
"""
# Valid voltages and corresponding data (two bytes)
speed_data = {0: [0x00, 0x00], # 0% (Values below 4V is not supported by the Grid, fans will be stopped)
4.0: [0x04, 0x00], # 33.3% (4.0V)
4.5: [0x04, 0x50], # 37.5% (4.5V)
5.0: [0x05, 0x00], # 41.7% (5V)
5.5: [0x05, 0x50], # 45.8% (5.5V)
6.0: [0x06, 0x00], # 50.0% (6V)
6.5: [0x06, 0x50], # 54.2% (6.5V)
7.0: [0x07, 0x00], # 58.3% (7V)
7.5: [0x07, 0x50], # 62.5% (7.5V)
8.0: [0x08, 0x00], # 66.7% (8V)
8.5: [0x08, 0x50], # 70.1% (8.5V)
9.0: [0x09, 0x00], # 75.0% (9.0V)
9.5: [0x09, 0x50], # 79.2% (9.5V)
10.0: [0x0A, 0x00], # 83.3% (10.0V)
10.5: [0x0A, 0x50], # 87.5% (10.5V)
11.0: [0x0B, 0x00], # 91.7% (11.0V)
11.5: [0x0B, 0x50], # 95.8% (11.5V)
12.0: [0x0C, 0x00]} # 100.0% (12.0V)
fan_data = {1: 0x01, # Fan 1
2: 0x02, # Fan 2
3: 0x03, # Fan 3
4: 0x04, # Fan 4
5: 0x05, # Fan 5
6: 0x06} # Fan 6
# Define bytes to be sent to the Grid for configuring a specific fan's voltage
# Format is seven bytes:
# 44 <fan id> C0 00 00 <voltage integer> <voltage decimal>
#
# Example configuring "7.5V" for fan "1":
# 44 01 C0 00 00 07 50
serial_data = [0x44, fan_data[fan], 0xC0, 0x00, 0x00, speed_data[voltage][0], speed_data[voltage][1]]
try:
with lock:
bytes_written = ser.write(serial.to_bytes(serial_data))
time.sleep(WAIT_GRID)
# TODO: Check reponse
# Expected response is one byte
response = ser.read(size=1)
print("Fan " + str(fan) + " updated")
except Exception as e:
helper.show_error("Could not set speed for fan " + str(fan) + ".\n\n"
"Please check settings for serial port " + str(ser.port) + ".\n\n"
"Exception:\n" + str(e) + "\n\n"
"The application will now exit.")
sys.exit(0)