def decrypt(ciphertext, tag, secret_key, initialisation_vector):
"""
Decrypts a cipher text using the given GCM authentication tag,
secret key and initialisation vector.
:param bytes ciphertext: the cipher text to be decrypted
:param bytes tag: the GCM authentication tag
:param bytes secret_key: the key to be used for encryption
:param bytes initialisation_vector: the initialisation vector
:return: the decrypted plaintext
:rtype: bytes
"""
cipher = Cipher(algorithm=algorithms.AES(secret_key),
mode=modes.GCM(initialization_vector=initialisation_vector,
tag=tag),
backend=default_backend())
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
python类AES的实例源码
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _wrap_core(wrapping_key, a, r, backend):
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def _unwrap_core(wrapping_key, a, r, backend):
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
return a, r
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
pushbullet.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def _encrypt_data(self, data):
assert self._encryption_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
iv = os.urandom(12)
encryptor = Cipher(
algorithms.AES(self._encryption_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
ciphertext = b"1" + encryptor.tag + iv + ciphertext
return standard_b64encode(ciphertext).decode("ASCII")
pushbullet.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _encrypt_data(self, data):
assert self._encryption_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
iv = os.urandom(12)
encryptor = Cipher(
algorithms.AES(self._encryption_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
ciphertext = b"1" + encryptor.tag + iv + ciphertext
return standard_b64encode(ciphertext).decode("ASCII")
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def stretch(passw, iv1):
# hash the external iv and the password 8192 times
digest = iv1 + (16 * b"\x00")
for i in range(8192):
passHash = hashes.Hash(hashes.SHA256(), backend=default_backend())
passHash.update(digest)
passHash.update(bytes(passw, "utf_16_le"))
digest = passHash.finalize()
return digest
# encrypting function
# arguments:
# infile: plaintext file path
# outfile: ciphertext file path
# passw: encryption password
# bufferSize: encryption buffer size, must be a multiple of
# AES block size (16)
# using a larger buffer speeds up things when dealing
# with big files
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
super().__init__()
self._endpoint = broker
self._encoding = "utf-8"
if enc_key == None or enc_iv == None:
self._transport_enc = False
self._transport_enc_key = None
self._transport_enc_iv = None
self._cipher = None
else:
self._transport_enc = True
self._transport_enc_key = enc_key
self._transport_enc_iv = enc_iv
backend = default_backend()
self._cipher = Cipher(algorithms.AES(
enc_key), modes.CBC(enc_iv), backend=backend)
self._session = requests.Session()
self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
'query_data': self.on_query_data, 'send_order': self.on_insert_order,
'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
self.client_id = ''
self.account_id = ''
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
if len(wrapped_key) < 24:
raise ValueError("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise ValueError("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
if len(wrapped_key) < 24:
raise ValueError("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise ValueError("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
def decrypt_ztoken(ztoken):
"""Decrypt the given ztoken, used by apple."""
if len(ztoken) <= 32:
return ztoken
keystring = '00000000000000000000000000000000'
key = bytes.fromhex(keystring)
cipher = Cipher(algorithms.AES(key), modes.ECB(),
backend=default_backend())
decryptor = cipher.decryptor()
token = decryptor.update(bytes.fromhex(ztoken[:64])) \
+ decryptor.finalize()
return token.decode()
def decrypt(self, ctext):
iv = ctext[:AES_BLOCK_SIZE]
ctext = ctext[AES_BLOCK_SIZE:]
cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder()
decryptor = cipher.decryptor()
data = decryptor.update(ctext) + decryptor.finalize()
return unpadder.update(data) + unpadder.finalize()
def encrypt(self, clear_text):
iv = os.urandom(AES_BLOCK_SIZE)
cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder()
encryptor = cipher.encryptor()
padded_data = padder.update(clear_text) + padder.finalize()
return iv + encryptor.update(padded_data) + encryptor.finalize()
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)