def crypt_create(key_bytes,
is_encrypt_flag=None,
iv_bytes=None,
algorithm=ciphers_algorithms.AES,
mode=ciphers_modes.CTR):
'''
Create and return a crypto context for symmetric key encryption using the
key key_bytes.
Uses algorithm in mode with an IV of iv_bytes.
If iv_bytes is None, an all-zero IV is used.
AES CTR mode uses the same operations for encryption and decyption.
'''
#print "Key: " + binascii.hexlify(bytes(key_bytes))
algo = algorithm(bytes(key_bytes))
# The block_size is in bits
if iv_bytes is None:
iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE)
cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)),
backend=backends.default_backend())
if is_encrypt_flag:
return cipher.encryptor()
else:
return cipher.decryptor()
python类CTR的实例源码
def crypt_bytes_key(key_bytes,
data_bytes,
is_encrypt_flag=None,
iv_bytes=None,
algorithm=ciphers_algorithms.AES,
mode=ciphers_modes.CTR):
'''
Use symmetric key encryption to encrypt or decrypt data_bytes with
key_bytes.
Returns the crypted data_bytes.
See crypt_create() and crypt_bytes_context() for details.
'''
crypt_context = crypt_create(key_bytes,
is_encrypt_flag=is_encrypt_flag,
iv_bytes=iv_bytes,
algorithm=algorithm,
mode=mode)
(_, crypt_bytes) = crypt_bytes_context(crypt_context, data_bytes)
return crypt_bytes
def _open_aes_ctr(key, nonce, ciphertext, expected_hmac, digest_method):
data_key, hmac_key = _halve_key(key)
hmac = _get_hmac(hmac_key, ciphertext, digest_method)
# Check the HMAC before we decrypt to verify ciphertext integrity
if hmac != expected_hmac:
raise IntegrityError("Computed HMAC on %s does not match stored HMAC")
decryptor = Cipher(
algorithms.AES(data_key),
modes.CTR(nonce),
backend=default_backend()
).decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
def _seal_aes_ctr(plaintext, key, nonce, digest_method):
data_key, hmac_key = _halve_key(key)
encryptor = Cipher(
algorithms.AES(data_key),
modes.CTR(nonce),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(plaintext.encode("utf-8")) + encryptor.finalize()
return ciphertext, _get_hmac(hmac_key, ciphertext, digest_method)
def encrypt(data, password, padding=0):
"""Encrypts data using the password.
Encrypts the data using the provided password using the cryptography module.
The password is converted into a base64-encoded key which is then used in a
symmetric encryption algorithm.
"""
if padding < 0:
print "Image too small to encode the file. \
You can store 1 byte per pixel."
exit()
password = bytes(password)
#Use key stretching to generate a secure key
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=bytes(password),
iterations=100000,
backend=default_backend())
key = kdf.derive(bytes(password))
nonce = os.urandom(16)
cipher = Cipher(algorithms.AES(key),\
modes.CTR(nonce), backend=default_backend())
enc = cipher.encryptor()
ct = enc.update(data) + enc.finalize()
#Add padding if needed
ct += os.urandom(padding-16)
#add nonce to data to allow decryption later (nonce does not need to be kept
#secret and is indistinguishable from random noise)
return bytes(nonce) + ct
def __init__(self, key, counter=0):
self._cipher = Cipher(algorithms.AES(key), modes.CTR(long_to_bytes(counter << 64, 16)), self.__backend__)
def aes256ctr(iv, key, data):
"""
Encrypts the given data using AES-256 in CTR mode with the given IV and key.
Can also be used for decryption due to how the CTR mode works.
:param iv: The IV as a byte list.
:param key: The key as a byte list.
:param data: The data to be encrypted as a byte list.
:return: The encrypted data as a byte list.
"""
cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend = default_backend())
encryptor = cipher.encryptor()
return encryptor.update(data) + encryptor.finalize()
def aes_ctr(key, ctr, data):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CTR(ctr), backend=backend)
decryptor = cipher.decryptor()
return decryptor.update(data) + decryptor.finalize()
def aes_ctr_buff(key, ctr, data, buff):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CTR(ctr), backend=backend)
decryptor = cipher.decryptor()
return decryptor.update_into(data, buff) #+ decryptor.finalize()
def __init__(self, key=None, blocklen=16):
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
key = key or urandom(blocklen)
iv = b' ' * blocklen
self.blocklen = blocklen
self.cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=default_backend())
def __init__(self,
remote: Node,
privkey: datatypes.PrivateKey,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
aes_secret: bytes,
mac_secret: bytes,
egress_mac: sha3.keccak_256,
ingress_mac: sha3.keccak_256,
chaindb: BaseChainDB,
network_id: int,
received_msg_callback: Optional[_ReceivedMsgCallbackType] = None
) -> None:
self._finished = asyncio.Event()
self._pending_replies = {} # type: Dict[int, Callable[[protocol._DecodedMsgType], None]]
self.remote = remote
self.privkey = privkey
self.reader = reader
self.writer = writer
self.base_protocol = P2PProtocol(self)
self.chaindb = chaindb
self.network_id = network_id
self.received_msg_callback = received_msg_callback
# The sub protocols that have been enabled for this peer; will be populated when
# we receive the initial hello msg.
self.enabled_sub_protocols = [] # type: List[protocol.Protocol]
self.egress_mac = egress_mac
self.ingress_mac = ingress_mac
# FIXME: Yes, the encryption is insecure, see: https://github.com/ethereum/devp2p/issues/32
iv = b"\x00" * 16
aes_cipher = Cipher(algorithms.AES(aes_secret), modes.CTR(iv), default_backend())
self.aes_enc = aes_cipher.encryptor()
self.aes_dec = aes_cipher.decryptor()
mac_cipher = Cipher(algorithms.AES(mac_secret), modes.ECB(), default_backend())
self.mac_enc = mac_cipher.encryptor().update
def decode(image, password=""):
im = Image.open(image)
px = im.load()
data = ""
#Decode the contents of the hidden data
for i in range(im.height):
for j in range(im.width):
data += decode_from_pixel(px[j, i])
#Optional decryption step
if len(password) > 0:
nonce = data[:16]
#Use key stretching to generate a secure key
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=bytes(password),
iterations=100000,
backend=default_backend())
key = kdf.derive(bytes(password))
cipher = Cipher(algorithms.AES(key),\
modes.CTR(nonce), backend=default_backend())
dec = cipher.decryptor()
data = dec.update(data[16:]) + dec.finalize()
#Create the header for reading
header = Header()
headerdata = struct.unpack("4s"+\
"I"+\
str(Header.MAX_FORMAT_LENGTH)+"s",
data[:4+4+Header.MAX_FORMAT_LENGTH])
header.magicnum = headerdata[0]
header.size = headerdata[1]
header.fformat = headerdata[2].strip("\x00")
#Verify integrity of recovered data
if header.magicnum != Header.magicnum:
print "There is no data to recover, quitting"
exit()
data = data[4+Header.MAX_FORMAT_LENGTH:4+Header.MAX_FORMAT_LENGTH+header.size]
print "Saving decoded output as {}"\
.format("output"+os.extsep+header.fformat)
with open("output"+os.extsep+header.fformat, 'wb') as outf:
outf.write(data)
def export(self, basePath, extractModules = False, secretSector = None):
if self.guessedType == "Kernel11 modules" and extractModules:
pos = 0
if not os.path.isdir(os.path.join(basePath, "modules")):
os.mkdir(os.path.join(basePath, "modules"))
while pos < self.size:
size = unpack_from("<I", self.sectionData, pos + 0x104)[0] * 0x200
name = self.sectionData[pos + 0x200: pos + 0x208].decode("ascii")
name = "{0}.cxi".format(name[:name.find('\x00')])
with open(os.path.join(basePath, "modules", name), "wb+") as f:
f.write(self.sectionData[pos : pos + size])
pos += size
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(self.sectionData)
elif self.guessedType.startswith("K9L") and secretSector is not None:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
encKeyX = self.sectionData[:0x10] if self.guessedType[3] == '0' else self.sectionData[0x60 : 0x70]
key0x11 = secretSector[:0x10] if self.guessedType[3] != '2' else secretSector[0x10 : 0x20]
de = Cipher(algorithms.AES(key0x11), modes.ECB(), backend=default_backend()).decryptor()
keyX = de.update(encKeyX) + de.finalize()
keyY = self.sectionData[0x10 : 0x20]
ctr = self.sectionData[0x20 : 0x30]
key = unhexlify("{0:032X}".format(keyscrambler(int(hexlify(keyX), 16), int(hexlify(keyY), 16))))
sizeDec = self.sectionData[0x30 : 0x38].decode("ascii")
size = int(sizeDec[:sizeDec.find('\x00')])
data = self.sectionData
if 0x800 + size <= self.size:
de = Cipher(algorithms.AES(key), modes.CTR(ctr), backend=default_backend()).decryptor()
data = b''.join((self.sectionData[:0x800], de.update(self.sectionData[0x800 : 0x800 + size]), de.finalize(), self.sectionData[0x800+size:]))
if extractModules:
exportP9(basePath, data)
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(data)
elif self.guessedType == "Kernel9":
if extractModules:
exportP9(basePath, self.sectionData)
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(self.sectionData)
else:
with open(os.path.join(basePath, "section{0}.bin".format(self.num)), "wb+") as f:
f.write(self.sectionData)