def encrypt(message, receiver_public_key):
sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend)
shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key)
sender_public_key = sender_private_key.public_key()
point = sender_public_key.public_numbers().encode_point()
iv = '000000000000'
xkdf = x963kdf.X963KDF(
algorithm = hashes.SHA256(),
length = 32,
sharedinfo = '',
backend = backend
)
key = xkdf.derive(shared_key)
encryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv),
backend = backend
).encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
return point + encryptor.tag + ciphertext
python类GCM的实例源码
def decrypt(message, receiver_private_key):
point = message[0:65]
tag = message[65:81]
ciphertext = message[81:]
sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point)
sender_public_key = sender_public_numbers.public_key(backend)
shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key)
iv = '000000000000'
xkdf = x963kdf.X963KDF(
algorithm = hashes.SHA256(),
length = 32,
sharedinfo = '',
backend = backend
)
key = xkdf.derive(shared_key)
decryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv,tag),
backend = backend
).decryptor()
message = decryptor.update(ciphertext) + decryptor.finalize()
return message
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
assert res != 0
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
assert res != 0
return self._backend._ffi.buffer(buf)[:outlen[0]]
def decryptStorage(path, key):
cipherkey = unhexlify(key)
with open(path, 'rb') as f:
iv = f.read(12)
tag = f.read(16)
cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
data = ''
while True:
block = f.read(16)
# data are not authenticated yet
if block:
data = data + decryptor.update(block).decode()
else:
break
# throws exception when the tag is wrong
data = data + decryptor.finalize().decode()
return json.loads(data)
def decryptEntryValue(nonce, val):
cipherkey = unhexlify(nonce)
iv = val[:12]
tag = val[12:28]
cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
data = ''
inputData = val[28:]
while True:
block = inputData[:16]
inputData = inputData[16:]
if block:
data = data + decryptor.update(block).decode()
else:
break
# throws exception when the tag is wrong
data = data + decryptor.finalize().decode()
return json.loads(data)
# Decrypt give entry nonce
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
assert res != 0
return self._backend._ffi.buffer(buf)[:outlen[0]]
def encode_block(self, block_id, block_data):
assert (isinstance(block_id, bytes)
and len(block_id) == self.block_id_len)
enc = Encoder()
enc.encode_bytes(self.magic)
iv = os.urandom(self.iv_len)
enc.encode_bytes(iv)
c = Cipher(algorithms.AES(self.key), modes.GCM(iv),
backend=self.backend)
e = c.encryptor()
e.authenticate_additional_data(block_id)
s = e.update(block_data) + e.finalize()
assert len(e.tag) == self.tag_len
enc.encode_bytes(e.tag)
enc.encode_bytes(s)
return enc.value
def decode_block(self, block_id, block_data):
assert (isinstance(block_id, bytes)
and len(block_id) == self.block_id_len)
assert isinstance(block_data, bytes)
assert len(block_data) > (len(self.magic) + self.iv_len + self.tag_len)
dec = Decoder(block_data)
# check magic
assert dec.decode_bytes(len(self.magic)) == self.magic
# get iv
iv = dec.decode_bytes(self.iv_len)
# get tag
tag = dec.decode_bytes(self.tag_len)
c = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag),
backend=self.backend)
d = c.decryptor()
d.authenticate_additional_data(block_id)
s = d.update(dec.decode_bytes_rest()) + d.finalize()
return s
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def encrypt(data, secret_key, initialisation_vector):
"""
Encrypts data using the given secret key and initialisation vector.
:param bytes data: the plaintext bytes to be encrypted
:param bytes secret_key: the key to be used for encryption
:param bytes initialisation_vector: the initialisation vector
:return: the cipher text and GCM authentication tag tuple
:rtype: (bytes, bytes)
"""
cipher = Cipher(algorithm=algorithms.AES(secret_key),
mode=modes.GCM(initialization_vector=initialisation_vector,
min_tag_length=16),
backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
return ciphertext, encryptor.tag
def decrypt(ciphertext, tag, secret_key, initialisation_vector):
"""
Decrypts a cipher text using the given GCM authentication tag,
secret key and initialisation vector.
:param bytes ciphertext: the cipher text to be decrypted
:param bytes tag: the GCM authentication tag
:param bytes secret_key: the key to be used for encryption
:param bytes initialisation_vector: the initialisation vector
:return: the decrypted plaintext
:rtype: bytes
"""
cipher = Cipher(algorithm=algorithms.AES(secret_key),
mode=modes.GCM(initialization_vector=initialisation_vector,
tag=tag),
backend=default_backend())
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
assert res != 0
return self._backend._ffi.buffer(buf)[:outlen[0]]
pushbullet.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _encrypt_data(self, data):
assert self._encryption_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
iv = os.urandom(12)
encryptor = Cipher(
algorithms.AES(self._encryption_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
ciphertext = b"1" + encryptor.tag + iv + ciphertext
return standard_b64encode(ciphertext).decode("ASCII")
pushbullet.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _encrypt_data(self, data):
assert self._encryption_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
iv = os.urandom(12)
encryptor = Cipher(
algorithms.AES(self._encryption_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
ciphertext = b"1" + encryptor.tag + iv + ciphertext
return standard_b64encode(ciphertext).decode("ASCII")
def update(self, data):
# OpenSSL 0.9.8e has an assertion in its EVP code that causes it
# to SIGABRT if you call update with an empty byte string. This can be
# removed when we drop support for 0.9.8e (CentOS/RHEL 5). This branch
# should be taken only when length is zero and mode is not GCM because
# AES GCM can return improper tag values if you don't call update
# with empty plaintext when authenticating AAD for ...reasons.
if len(data) == 0 and not isinstance(self._mode, modes.GCM):
return b""
buf = self._backend._ffi.new("unsigned char[]",
len(data) + self._block_size - 1)
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
self._backend.openssl_assert(res != 0)
return self._backend._ffi.buffer(buf)[:outlen[0]]
def encrypt(self, cleartext: bytes):
# No need for padding as we are using GCM
# Get a new iv for GCM
iv = urandom(int(AES.block_size // 8))
cipher = Cipher(self._hazmat_key, GCM(iv), backend=openssl)
encryptor = cipher.encryptor()
enc = encryptor.update(cleartext) + encryptor.finalize()
return iv + enc + encryptor.tag
def decrypt(self, ciphertext: bytes):
iv = ciphertext[:AES.block_size // 8]
tag = ciphertext[-16:]
cipher = Cipher(self._hazmat_key, GCM(iv, tag), backend=openssl).decryptor()
return cipher.update(ciphertext[16:-16]) + cipher.finalize()
def aes_decrypt(key, iv, payload):
""" Use AES128 GCM with the given key and iv to decrypt the payload. """
data = payload[:-16]
tag = payload[-16:]
backend = default_backend()
decryptor = Cipher(
algorithms.AES(key),
GCM(iv, tag=tag),
backend=backend).decryptor()
return decryptor.update(data) + decryptor.finalize()
def aes_encrypt(key, iv, plaintext):
""" Use AES128 GCM with the given key and iv to encrypt the plaintext. """
backend = default_backend()
encryptor = Cipher(
algorithms.AES(key),
GCM(iv),
backend=backend).encryptor()
return encryptor.update(plaintext) + encryptor.finalize() + encryptor.tag