def encrypt(message, receiver_public_key):
sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend)
shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key)
sender_public_key = sender_private_key.public_key()
point = sender_public_key.public_numbers().encode_point()
iv = '000000000000'
xkdf = x963kdf.X963KDF(
algorithm = hashes.SHA256(),
length = 32,
sharedinfo = '',
backend = backend
)
key = xkdf.derive(shared_key)
encryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv),
backend = backend
).encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
return point + encryptor.tag + ciphertext
python类Cipher()的实例源码
def decrypt(message, receiver_private_key):
point = message[0:65]
tag = message[65:81]
ciphertext = message[81:]
sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point)
sender_public_key = sender_public_numbers.public_key(backend)
shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key)
iv = '000000000000'
xkdf = x963kdf.X963KDF(
algorithm = hashes.SHA256(),
length = 32,
sharedinfo = '',
backend = backend
)
key = xkdf.derive(shared_key)
decryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv,tag),
backend = backend
).decryptor()
message = decryptor.update(ciphertext) + decryptor.finalize()
return message
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def encrypt(plaintext: bytes, token: bytes) -> bytes:
"""Encrypt plaintext with a given token.
:param bytes plaintext: Plaintext (json) to encrypt
:param bytes token: Token to use
:return: Encrypted bytes"""
if not isinstance(plaintext, bytes):
raise TypeError("plaintext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
padder = padding.PKCS7(128).padder()
padded_plaintext = padder.update(plaintext) + padder.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(padded_plaintext) + encryptor.finalize()
def decrypt(ciphertext: bytes, token: bytes) -> bytes:
"""Decrypt ciphertext with a given token.
:param bytes ciphertext: Ciphertext to decrypt
:param bytes token: Token to use
:return: Decrypted bytes object"""
if not isinstance(ciphertext, bytes):
raise TypeError("ciphertext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_plaintext = unpadder.update(padded_plaintext)
unpadded_plaintext += unpadder.finalize()
return unpadded_plaintext
def crypt_create(key_bytes,
is_encrypt_flag=None,
iv_bytes=None,
algorithm=ciphers_algorithms.AES,
mode=ciphers_modes.CTR):
'''
Create and return a crypto context for symmetric key encryption using the
key key_bytes.
Uses algorithm in mode with an IV of iv_bytes.
If iv_bytes is None, an all-zero IV is used.
AES CTR mode uses the same operations for encryption and decyption.
'''
#print "Key: " + binascii.hexlify(bytes(key_bytes))
algo = algorithm(bytes(key_bytes))
# The block_size is in bits
if iv_bytes is None:
iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE)
cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)),
backend=backends.default_backend())
if is_encrypt_flag:
return cipher.encryptor()
else:
return cipher.decryptor()
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, endpoint="http://127.0.0.1:10092/api", encoding="utf-8", enc_key=None, enc_iv=None):
self._endpoint = endpoint
self._encoding = "utf-8"
if enc_key == None or enc_iv == None:
self._transport_enc = False
self._transport_enc_key = None
self._transport_enc_iv = None
self._cipher = None
else:
self._transport_enc = True
self._transport_enc_key = enc_key
self._transport_enc_iv = enc_iv
backend = default_backend()
self._cipher = Cipher(algorithms.AES(enc_key), modes.CBC(enc_iv), backend=backend)
self._session = requests.Session()
def decryptStorage(path, key):
cipherkey = unhexlify(key)
with open(path, 'rb') as f:
iv = f.read(12)
tag = f.read(16)
cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
data = ''
while True:
block = f.read(16)
# data are not authenticated yet
if block:
data = data + decryptor.update(block).decode()
else:
break
# throws exception when the tag is wrong
data = data + decryptor.finalize().decode()
return json.loads(data)
def decryptEntryValue(nonce, val):
cipherkey = unhexlify(nonce)
iv = val[:12]
tag = val[12:28]
cipher = Cipher(algorithms.AES(cipherkey), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
data = ''
inputData = val[28:]
while True:
block = inputData[:16]
inputData = inputData[16:]
if block:
data = data + decryptor.update(block).decode()
else:
break
# throws exception when the tag is wrong
data = data + decryptor.finalize().decode()
return json.loads(data)
# Decrypt give entry nonce
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def encode_block(self, block_id, block_data):
assert (isinstance(block_id, bytes)
and len(block_id) == self.block_id_len)
enc = Encoder()
enc.encode_bytes(self.magic)
iv = os.urandom(self.iv_len)
enc.encode_bytes(iv)
c = Cipher(algorithms.AES(self.key), modes.GCM(iv),
backend=self.backend)
e = c.encryptor()
e.authenticate_additional_data(block_id)
s = e.update(block_data) + e.finalize()
assert len(e.tag) == self.tag_len
enc.encode_bytes(e.tag)
enc.encode_bytes(s)
return enc.value
def decode_block(self, block_id, block_data):
assert (isinstance(block_id, bytes)
and len(block_id) == self.block_id_len)
assert isinstance(block_data, bytes)
assert len(block_data) > (len(self.magic) + self.iv_len + self.tag_len)
dec = Decoder(block_data)
# check magic
assert dec.decode_bytes(len(self.magic)) == self.magic
# get iv
iv = dec.decode_bytes(self.iv_len)
# get tag
tag = dec.decode_bytes(self.tag_len)
c = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag),
backend=self.backend)
d = c.decryptor()
d.authenticate_additional_data(block_id)
s = d.update(dec.decode_bytes_rest()) + d.finalize()
return s
def execute(self, item):
if item[1]:
url = item[1] + "/" + item[2]
else:
url = item[2]
item[2] = os.path.basename(urllib.parse.urlparse(url).path)
if item[3]:
backend = default_backend()
r = requests.get(item[3].uri)
key = r.content
cipher = Cipher(algorithms.AES(key), modes.CBC(bytes.fromhex(item[3].iv[2:])), backend=backend)
decryptor = cipher.decryptor()
r = requests.get(url, stream=True)
with open(os.path.join(self.location, item[2]), 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
if item[3]:
f.write(decryptor.update(chunk))
else:
f.write(chunk)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def encrypt(data, secret_key, initialisation_vector):
"""
Encrypts data using the given secret key and initialisation vector.
:param bytes data: the plaintext bytes to be encrypted
:param bytes secret_key: the key to be used for encryption
:param bytes initialisation_vector: the initialisation vector
:return: the cipher text and GCM authentication tag tuple
:rtype: (bytes, bytes)
"""
cipher = Cipher(algorithm=algorithms.AES(secret_key),
mode=modes.GCM(initialization_vector=initialisation_vector,
min_tag_length=16),
backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
return ciphertext, encryptor.tag
def decrypt(ciphertext, tag, secret_key, initialisation_vector):
"""
Decrypts a cipher text using the given GCM authentication tag,
secret key and initialisation vector.
:param bytes ciphertext: the cipher text to be decrypted
:param bytes tag: the GCM authentication tag
:param bytes secret_key: the key to be used for encryption
:param bytes initialisation_vector: the initialisation vector
:return: the decrypted plaintext
:rtype: bytes
"""
cipher = Cipher(algorithm=algorithms.AES(secret_key),
mode=modes.GCM(initialization_vector=initialisation_vector,
tag=tag),
backend=default_backend())
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
def _getCipher(self, cip, iv, key):
"""
Creates an initialized cipher object.
@param cip: the name of the cipher, maps into cipherMap
@param iv: the initialzation vector
@param key: the encryption key
@return: the cipher object.
"""
algorithmClass, keySize, modeClass = self.cipherMap[cip]
if algorithmClass is None:
return _DummyCipher()
return Cipher(
algorithmClass(key[:keySize]),
modeClass(iv[:algorithmClass.block_size // 8]),
backend=default_backend(),
)
def _getSupportedCiphers():
"""
Build a list of ciphers that are supported by the backend in use.
@return: a list of supported ciphers.
@rtype: L{list} of L{str}
"""
supportedCiphers = []
cs = [b'aes256-ctr', b'aes256-cbc', b'aes192-ctr', b'aes192-cbc',
b'aes128-ctr', b'aes128-cbc', b'cast128-ctr', b'cast128-cbc',
b'blowfish-ctr', b'blowfish-cbc', b'3des-ctr', b'3des-cbc']
for cipher in cs:
algorithmClass, keySize, modeClass = SSHCiphers.cipherMap[cipher]
try:
Cipher(
algorithmClass(b' ' * keySize),
modeClass(b' ' * (algorithmClass.block_size // 8)),
backend=default_backend(),
).encryptor()
except UnsupportedAlgorithm:
pass
else:
supportedCiphers.append(cipher)
return supportedCiphers
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)