def get_payload(self):
self.payload_fields.append(("Current Time", self.time))
self.payload_fields.append(("Uptime (ns)", self.uptime))
self.payload_fields.append(("Last Downtime Duration (ns) (5 second error)", self.downtime))
time = little_endian(bitstring.pack("64", self.time))
uptime = little_endian(bitstring.pack("64", self.uptime))
downtime = little_endian(bitstring.pack("64", self.downtime))
payload = time + uptime + downtime
return payload
python类pack()的实例源码
def get_payload(self):
self.payload_fields.append(("Group ", self.group))
self.payload_fields.append(("Label ", self.label))
self.payload_fields.append(("Updated At ", self.updated_at))
group = b"".join(little_endian(bitstring.pack("8", b)) for b in self.group)
label = b"".join(little_endian(bitstring.pack("8", ord(c))) for c in self.label)
label_padding = b"".join(little_endian(bitstring.pack("8", 0)) for i in range(32-len(self.label)))
label += label_padding
updated_at = little_endian(bitstring.pack("64", self.updated_at))
payload = group + label + updated_at
return payload
def get_payload(self):
field_len = 64
self.payload_fields.append(("Byte Array", self.byte_array))
byte_array = b"".join(little_endian(bitstring.pack("8", b)) for b in self.byte_array)
byte_array_len = len(byte_array)
if byte_array_len < field_len:
byte_array += b"".join(little_endian(bitstring.pack("8", 0)) for i in range(field_len-byte_array_len))
elif byte_array_len > field_len:
byte_array = byte_array[:field_len]
payload = byte_array
return payload
def get_payload(self):
self.payload_fields.append(("Byte Array", self.byte_array))
byte_array = b"".join(little_endian(bitstring.pack("8", b)) for b in self.byte_array)
payload = byte_array
return payload
##### LIGHT MESSAGES #####
def get_payload(self):
reserved_8 = little_endian(bitstring.pack("8", self.reserved))
color = b"".join(little_endian(bitstring.pack("16", field)) for field in self.color)
duration = little_endian(bitstring.pack("32", self.duration))
payload = reserved_8 + color + duration
return payload
def get_payload(self):
reserved_8 = little_endian(bitstring.pack("8", self.reserved))
transient = little_endian(bitstring.pack("uint:8", self.transient))
color = b"".join(little_endian(bitstring.pack("16", field)) for field in self.color)
period = little_endian(bitstring.pack("uint:32", self.period))
cycles = little_endian(bitstring.pack("float:32", self.cycles))
duty_cycle = little_endian(bitstring.pack("int:16", self.duty_cycle))
waveform = little_endian(bitstring.pack("uint:8", self.waveform))
payload = reserved_8 + transient + color + period + cycles + duty_cycle + waveform
return payload
def get_payload(self):
power_level = little_endian(bitstring.pack("16", self.power_level))
duration = little_endian(bitstring.pack("32", self.duration))
payload = power_level + duration
return payload
def get_payload(self):
self.payload_fields.append(("Power Level", self.power_level))
power_level = little_endian(bitstring.pack("16", self.power_level))
payload = power_level
return payload
##### INFRARED MESSAGES #####
def get_payload(self):
self.payload_fields.append(("Infrared Brightness", self.infrared_brightness))
infrared_brightness = little_endian(bitstring.pack("16", self.infrared_brightness))
payload = infrared_brightness
return payload
def get_payload(self):
infrared_brightness = little_endian(bitstring.pack("16", self.infrared_brightness))
payload = infrared_brightness
return payload
##### MULTIZONE MESSAGES #####
def get_payload(self):
self.payload_fields.append(("Count", self.count))
self.payload_fields.append(("Index", self.index))
self.payload_fields.append(("Color (HSBK)", self.color))
count = little_endian(bitstring.pack("8", self.count))
index = little_endian(bitstring.pack("8", self.index))
payload = count + index
for color in self.color:
payload += b"".join(little_endian(bitstring.pack("16", field)) for field in color)
return payload
def get_payload(self):
start_index = little_endian(bitstring.pack("8", self.start_index))
end_index = little_endian(bitstring.pack("8", self.end_index))
color = b"".join(little_endian(bitstring.pack("16", field)) for field in self.color)
duration = little_endian(bitstring.pack("32", self.duration))
apply = little_endian(bitstring.pack("8", self.apply))
payload = start_index + end_index + color + duration + apply
return payload
def get_payload(self):
start_index = little_endian(bitstring.pack("8", self.start_index))
end_index = little_endian(bitstring.pack("8", self.end_index))
payload = start_index + end_index
return payload
def get_payload(self):
return little_endian(bitstring.pack(""))
def get_frame(self):
size_format = self.frame_format[0]
flags_format = self.frame_format[1]
source_id_format = self.frame_format[2]
size = little_endian(bitstring.pack(size_format, self.size))
flags = little_endian(bitstring.pack(flags_format, self.origin, self.tagged, self.addressable, self.protocol))
source_id = little_endian(bitstring.pack(source_id_format, self.source_id))
frame = size + flags + source_id
return frame
def get_frame_addr(self):
mac_addr_format = self.frame_addr_format[0]
reserved_48_format = self.frame_addr_format[1]
response_flags_format = self.frame_addr_format[2]
seq_num_format = self.frame_addr_format[3]
mac_addr = little_endian(bitstring.pack(mac_addr_format, convert_MAC_to_int(self.target_addr)))
reserved_48 = little_endian(bitstring.pack(reserved_48_format, self.reserved))
response_flags = little_endian(bitstring.pack(response_flags_format, self.reserved, self.ack_requested, self.response_requested))
seq_num = little_endian(bitstring.pack(seq_num_format, self.seq_num))
frame_addr = mac_addr + reserved_48 + response_flags + seq_num
return frame_addr
def little_endian(bs):
shifts = [i*8 for i in range(int(len(bs)/8))]
int_bytes_little_endian = [int(bs.uintbe >> i & 0xff) for i in shifts]
packed_message_little_endian = b""
for b in int_bytes_little_endian:
packed_message_little_endian += struct.pack("B", b)
return packed_message_little_endian
def get_payload(self):
self.payload_fields.append(("Service", self.service))
self.payload_fields.append(("Port", self.port))
service = little_endian(bitstring.pack("8", self.service))
port = little_endian(bitstring.pack("32", self.port))
payload = service + port
return payload
def get_payload(self):
self.payload_fields.append(("Signal (mW)", self.signal))
self.payload_fields.append(("TX (bytes since on)", self.tx))
self.payload_fields.append(("RX (bytes since on)", self.rx))
self.payload_fields.append(("Reserved", self.reserved1))
signal = little_endian(bitstring.pack("32", self.signal))
tx = little_endian(bitstring.pack("32", self.tx))
rx = little_endian(bitstring.pack("32", self.rx))
reserved1 = little_endian(bitstring.pack("16", self.reserved1))
payload = signal + tx + rx + reserved1
return payload
def get_payload(self):
self.payload_fields.append(("Timestamp of Build", self.build))
self.payload_fields.append(("Reserved", self.reserved1))
self.payload_fields.append(("Version", self.version))
build = little_endian(bitstring.pack("64", self.build))
reserved1 = little_endian(bitstring.pack("64", self.reserved1))
version = little_endian(bitstring.pack("32", self.version))
payload = build + reserved1 + version
return payload