def aes_key_unwrap(wrapping_key, wrapped_key, backend):
if len(wrapped_key) < 24:
raise ValueError("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise ValueError("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
python类ECB的实例源码
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
if len(wrapped_key) < 24:
raise ValueError("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise ValueError("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
def __init__(self, key):
self._cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
def aes_ecb_enc(key, data):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend)
decryptor = cipher.decryptor()
return decryptor.update(data) + decryptor.finalize()
def aes_ecb_dec(key, data):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend)
decryptor = cipher.decryptor()
return decryptor.update(data) + decryptor.finalize()
def __init__(self,
remote: Node,
privkey: datatypes.PrivateKey,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
aes_secret: bytes,
mac_secret: bytes,
egress_mac: sha3.keccak_256,
ingress_mac: sha3.keccak_256,
chaindb: BaseChainDB,
network_id: int,
received_msg_callback: Optional[_ReceivedMsgCallbackType] = None
) -> None:
self._finished = asyncio.Event()
self._pending_replies = {} # type: Dict[int, Callable[[protocol._DecodedMsgType], None]]
self.remote = remote
self.privkey = privkey
self.reader = reader
self.writer = writer
self.base_protocol = P2PProtocol(self)
self.chaindb = chaindb
self.network_id = network_id
self.received_msg_callback = received_msg_callback
# The sub protocols that have been enabled for this peer; will be populated when
# we receive the initial hello msg.
self.enabled_sub_protocols = [] # type: List[protocol.Protocol]
self.egress_mac = egress_mac
self.ingress_mac = ingress_mac
# FIXME: Yes, the encryption is insecure, see: https://github.com/ethereum/devp2p/issues/32
iv = b"\x00" * 16
aes_cipher = Cipher(algorithms.AES(aes_secret), modes.CTR(iv), default_backend())
self.aes_enc = aes_cipher.encryptor()
self.aes_dec = aes_cipher.decryptor()
mac_cipher = Cipher(algorithms.AES(mac_secret), modes.ECB(), default_backend())
self.mac_enc = mac_cipher.encryptor().update
def export(self, basePath, extractModules = False, secretSector = None):
if self.guessedType == "Kernel11 modules" and extractModules:
pos = 0
if not os.path.isdir(os.path.join(basePath, "modules")):
os.mkdir(os.path.join(basePath, "modules"))
while pos < self.size:
size = unpack_from("<I", self.sectionData, pos + 0x104)[0] * 0x200
name = self.sectionData[pos + 0x200: pos + 0x208].decode("ascii")
name = "{0}.cxi".format(name[:name.find('\x00')])
with open(os.path.join(basePath, "modules", name), "wb+") as f:
f.write(self.sectionData[pos : pos + size])
pos += size
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(self.sectionData)
elif self.guessedType.startswith("K9L") and secretSector is not None:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
encKeyX = self.sectionData[:0x10] if self.guessedType[3] == '0' else self.sectionData[0x60 : 0x70]
key0x11 = secretSector[:0x10] if self.guessedType[3] != '2' else secretSector[0x10 : 0x20]
de = Cipher(algorithms.AES(key0x11), modes.ECB(), backend=default_backend()).decryptor()
keyX = de.update(encKeyX) + de.finalize()
keyY = self.sectionData[0x10 : 0x20]
ctr = self.sectionData[0x20 : 0x30]
key = unhexlify("{0:032X}".format(keyscrambler(int(hexlify(keyX), 16), int(hexlify(keyY), 16))))
sizeDec = self.sectionData[0x30 : 0x38].decode("ascii")
size = int(sizeDec[:sizeDec.find('\x00')])
data = self.sectionData
if 0x800 + size <= self.size:
de = Cipher(algorithms.AES(key), modes.CTR(ctr), backend=default_backend()).decryptor()
data = b''.join((self.sectionData[:0x800], de.update(self.sectionData[0x800 : 0x800 + size]), de.finalize(), self.sectionData[0x800+size:]))
if extractModules:
exportP9(basePath, data)
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(data)
elif self.guessedType == "Kernel9":
if extractModules:
exportP9(basePath, self.sectionData)
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(self.sectionData)
else:
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(self.sectionData)