def read_fan_rpm(ser, lock):
"""Reads the current rpm of each fan.
Returns:
- If success: A list with rpm data for each fan
- If failure to read data: An empty list
"""
# List to hold fan rpm data to be returned
fans = []
with lock:
for fan in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06]:
try:
# Define bytes to be sent to the Grid for reading rpm for a specific fan
# Format is two bytes:
# 8A <fan id>
serial_data = [0x8A, fan]
ser.reset_output_buffer()
# TODO: Check bytes written
bytes_written = ser.write(serial.to_bytes(serial_data))
# Wait before checking response
time.sleep(WAIT_GRID)
# Expected response is 5 bytes
# Example response: C0 00 00 03 00 = 0x0300 = 768 rpm (two bytes unsigned)
response = ser.read(size=5)
# Check if the Grid responded with any data
if response:
# Check for correct response, first three bytes should be C0 00 00
if response[0] == int("0xC0", 16) and response[1] == response[2] == int("0x00", 16):
# Convert rpm from 2-bytes unsigned value to decimal
rpm = response[3] * 256 + response[4]
fans.append(rpm)
# An incorrect response was received, return an empty list
else:
return []
# In case no response (0 bytes) received, return an empty list
else:
return []
except Exception as e:
helper.show_error("Could not read rpm for fan " + str(fan) + ".\n\n"
"Please check serial port settings.\n\n"
"Exception:\n" + str(e) + "\n\n"
"The application will now exit.")
print(str(e))
sys.exit(0)
# Fan
return fans
评论列表
文章目录