def read_beacon_payload(beacon_bytes):
"""Returns a BeaconPayload namedtuple representing the given beacon bytes.
Decrypts the inner beacon data if necessary.
:param beacon_bytes: beacon payload (bytes).
:return: BeaconPayload namedtuple
"""
if len(beacon_bytes) < BEACON_HEADER_LENGTH_V1:
raise InvalidBeaconingPacket(
"Beaconing packet must be at least %d bytes." % (
BEACON_HEADER_LENGTH_V1))
header = beacon_bytes[:BEACON_HEADER_LENGTH_V1]
version, beacon_type_code, expected_payload_length = struct.unpack(
BEACON_HEADER_FORMAT_V1, header)
actual_payload_length = len(beacon_bytes) - BEACON_HEADER_LENGTH_V1
if len(beacon_bytes) - BEACON_HEADER_LENGTH_V1 < expected_payload_length:
raise InvalidBeaconingPacket(
"Invalid payload length: expected %d bytes, got %d bytes." % (
expected_payload_length, actual_payload_length))
payload_start = BEACON_HEADER_LENGTH_V1
payload_end = BEACON_HEADER_LENGTH_V1 + expected_payload_length
payload_bytes = beacon_bytes[payload_start:payload_end]
payload = None
if version == 1:
if len(payload_bytes) == 0:
# No encrypted inner payload; nothing to do.
pass
else:
try:
decrypted_data = fernet_decrypt_psk(
payload_bytes, ttl=60, raw=True)
except InvalidToken:
raise InvalidBeaconingPacket(
"Failed to decrypt inner payload: check MAAS secret key.")
try:
decompressed_data = decompress(decrypted_data)
except OSError:
raise InvalidBeaconingPacket(
"Failed to decompress inner payload: %r" % decrypted_data)
try:
# Replace the data in the dictionary with its decrypted form.
payload = BSON.decode(decompressed_data)
except BSONError:
raise InvalidBeaconingPacket(
"Inner beacon payload is not BSON: %r" % decompressed_data)
else:
raise InvalidBeaconingPacket(
"Unknown beacon version: %d" % version)
beacon_type_code = payload["type"] if payload else beacon_type_code
return BeaconPayload(
beacon_bytes, version, BEACON_TYPE_VALUES[beacon_type_code], payload)
评论列表
文章目录