def __init__(self, filename, session=None,
readers_public_key=None,
writers_private_key=None):
self.fd = standard.WritableAddressSpace(
filename=filename, session=session, mode="w+b")
self.session = session
self.profile = AgentProfile(session=session)
self.cipher = CipherProperties(session=session).generate_keys()
self.readers_public_key = readers_public_key
self.writers_private_key = writers_private_key
# Cipher is encrypted with the reader's public key - only the reader can
# read it. It is also signed with the sender's private key.
signature = Signature(session=session)
cipher_plain_text = self.cipher.to_json()
signature.encrypted_cipher = readers_public_key.encrypt(
cipher_plain_text)
signature.signature = writers_private_key.sign(cipher_plain_text)
serialized_signature = signature.to_json()
self.write_part(serialized_signature, "EncryptedCipher")
self.hmac = hmac.HMAC(self.cipher.hmac_key.RawBytes(),
hashes.SHA256(),
backend=openssl.backend)
self.hmac.update(serialized_signature)
python类Cipher()的实例源码
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _wrap_core(wrapping_key, a, r, backend):
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def _unwrap_core(wrapping_key, a, r, backend):
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
return a, r
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
pushbullet.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def _encrypt_data(self, data):
assert self._encryption_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
iv = os.urandom(12)
encryptor = Cipher(
algorithms.AES(self._encryption_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
ciphertext = b"1" + encryptor.tag + iv + ciphertext
return standard_b64encode(ciphertext).decode("ASCII")
pushbullet.py 文件源码
项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet
作者: DeligenceTechnologies
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _encrypt_data(self, data):
assert self._encryption_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
iv = os.urandom(12)
encryptor = Cipher(
algorithms.AES(self._encryption_key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
ciphertext = b"1" + encryptor.tag + iv + ciphertext
return standard_b64encode(ciphertext).decode("ASCII")
def _encrypt_from_parts(self, data, current_time, iv):
if not isinstance(data, bytes):
raise TypeError("data must be bytes.")
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, algorithm, key, associated_data, iv):
"""Prepares initial values."""
self.source_key = key
# Construct an encryptor object with the given key and a provided IV.
# This is intentionally generic to leave an option for non-Cipher encryptor types in the future.
self.iv = iv
self._encryptor = Cipher(
algorithm.encryption_algorithm(key),
algorithm.encryption_mode(self.iv),
backend=default_backend()
).encryptor()
# associated_data will be authenticated but not encrypted,
# it must also be passed in on decryption.
self._encryptor.authenticate_additional_data(associated_data)
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
super().__init__()
self._endpoint = broker
self._encoding = "utf-8"
if enc_key == None or enc_iv == None:
self._transport_enc = False
self._transport_enc_key = None
self._transport_enc_iv = None
self._cipher = None
else:
self._transport_enc = True
self._transport_enc_key = enc_key
self._transport_enc_iv = enc_iv
backend = default_backend()
self._cipher = Cipher(algorithms.AES(
enc_key), modes.CBC(enc_iv), backend=backend)
self._session = requests.Session()
self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
'query_data': self.on_query_data, 'send_order': self.on_insert_order,
'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
self.client_id = ''
self.account_id = ''
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
if len(wrapped_key) < 24:
raise ValueError("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise ValueError("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
if len(wrapped_key) < 24:
raise ValueError("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise ValueError("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
# pack/unpack are safe as these are always 64-bit chunks
atr = struct.pack(
">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
) + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
def _cipher_aes_key(value, secret, salt, cost, decrypt=False):
"""
Internal helper for :meth:`encrypt_key` --
handles lowlevel encryption/decryption.
Algorithm details:
This function uses PBKDF2-HMAC-SHA256 to generate a 32-byte AES key
and a 16-byte IV from the application secret & random salt.
It then uses AES-256-CTR to encrypt/decrypt the TOTP key.
CTR mode was chosen over CBC because the main attack scenario here
is that the attacker has stolen the database, and is trying to decrypt a TOTP key
(the plaintext value here). To make it hard for them, we want every password
to decrypt to a potentially valid key -- thus need to avoid any authentication
or padding oracle attacks. While some random padding construction could be devised
to make this work for CBC mode, a stream cipher mode is just plain simpler.
OFB/CFB modes would also work here, but seeing as they have malleability
and cyclic issues (though remote and barely relevant here),
CTR was picked as the best overall choice.
"""
# make sure backend AES support is available
if _cg_ciphers is None:
raise RuntimeError("TOTP encryption requires 'cryptography' package "
"(https://cryptography.io)")
# use pbkdf2 to derive both key (32 bytes) & iv (16 bytes)
# NOTE: this requires 2 sha256 blocks to be calculated.
keyiv = pbkdf2_hmac("sha256", secret, salt=salt, rounds=(1 << cost), keylen=48)
# use AES-256-CTR to encrypt/decrypt input value
cipher = _cg_ciphers.Cipher(_cg_ciphers.algorithms.AES(keyiv[:32]),
_cg_ciphers.modes.CTR(keyiv[32:]),
_cg_default_backend())
ctx = cipher.decryptor() if decrypt else cipher.encryptor()
return ctx.update(value) + ctx.finalize()
def decrypt_ztoken(ztoken):
"""Decrypt the given ztoken, used by apple."""
if len(ztoken) <= 32:
return ztoken
keystring = '00000000000000000000000000000000'
key = bytes.fromhex(keystring)
cipher = Cipher(algorithms.AES(key), modes.ECB(),
backend=default_backend())
decryptor = cipher.decryptor()
token = decryptor.update(bytes.fromhex(ztoken[:64])) \
+ decryptor.finalize()
return token.decode()
def decrypt(self, ctext):
iv = ctext[:AES_BLOCK_SIZE]
ctext = ctext[AES_BLOCK_SIZE:]
cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder()
decryptor = cipher.decryptor()
data = decryptor.update(ctext) + decryptor.finalize()
return unpadder.update(data) + unpadder.finalize()
def encrypt(self, clear_text):
iv = os.urandom(AES_BLOCK_SIZE)
cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder()
encryptor = cipher.encryptor()
padded_data = padder.update(clear_text) + padder.finalize()
return iv + encryptor.update(padded_data) + encryptor.finalize()
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
# pack/unpack are safe as these are always 64-bit chunks
a = struct.pack(
">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
)
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)