python类Cipher()的实例源码

crypto.py 文件源码 项目:rvmi-rekall 作者: fireeye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, filename, session=None,
                 readers_public_key=None,
                 writers_private_key=None):
        self.fd = standard.WritableAddressSpace(
            filename=filename, session=session, mode="w+b")
        self.session = session
        self.profile = AgentProfile(session=session)
        self.cipher = CipherProperties(session=session).generate_keys()
        self.readers_public_key = readers_public_key
        self.writers_private_key = writers_private_key

        # Cipher is encrypted with the reader's public key - only the reader can
        # read it. It is also signed with the sender's private key.
        signature = Signature(session=session)
        cipher_plain_text = self.cipher.to_json()
        signature.encrypted_cipher = readers_public_key.encrypt(
            cipher_plain_text)
        signature.signature = writers_private_key.sign(cipher_plain_text)
        serialized_signature = signature.to_json()
        self.write_part(serialized_signature, "EncryptedCipher")
        self.hmac = hmac.HMAC(self.cipher.hmac_key.RawBytes(),
                              hashes.SHA256(),
                              backend=openssl.backend)
        self.hmac.update(serialized_signature)
fernet.py 文件源码 项目:quickstart-git2s3 作者: aws-quickstart 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
keywrap.py 文件源码 项目:quickstart-git2s3 作者: aws-quickstart 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _wrap_core(wrapping_key, a, r, backend):
    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            )
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)
keywrap.py 文件源码 项目:quickstart-git2s3 作者: aws-quickstart 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _unwrap_core(wrapping_key, a, r, backend):
    # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
    decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
    n = len(r)
    for j in reversed(range(6)):
        for i in reversed(range(n)):
            # pack/unpack are safe as these are always 64-bit chunks
            atr = struct.pack(
                ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
            ) + r[i]
            # every decryption operation is a discrete 16 byte chunk so
            # it is safe to reuse the decryptor for the entire operation
            b = decryptor.update(atr)
            a = b[:8]
            r[i] = b[-8:]

    assert decryptor.finalize() == b""
    return a, r
fernet.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
fernet.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
pushbullet.py 文件源码 项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet 作者: DeligenceTechnologies 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _encrypt_data(self, data):
        assert self._encryption_key

        from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
        from cryptography.hazmat.backends import default_backend

        iv = os.urandom(12)
        encryptor = Cipher(
            algorithms.AES(self._encryption_key),
            modes.GCM(iv),
            backend=default_backend()
        ).encryptor()

        ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
        ciphertext = b"1" + encryptor.tag + iv + ciphertext
        return standard_b64encode(ciphertext).decode("ASCII")
pushbullet.py 文件源码 项目:Intruder-detector-with-Raspberry-Pi-and-Pushbullet 作者: DeligenceTechnologies 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _encrypt_data(self, data):
        assert self._encryption_key

        from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
        from cryptography.hazmat.backends import default_backend

        iv = os.urandom(12)
        encryptor = Cipher(
            algorithms.AES(self._encryption_key),
            modes.GCM(iv),
            backend=default_backend()
        ).encryptor()

        ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize()
        ciphertext = b"1" + encryptor.tag + iv + ciphertext
        return standard_b64encode(ciphertext).decode("ASCII")
fernet.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
encryption.py 文件源码 项目:aws-encryption-sdk-python 作者: awslabs 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, algorithm, key, associated_data, iv):
        """Prepares initial values."""
        self.source_key = key

        # Construct an encryptor object with the given key and a provided IV.
        # This is intentionally generic to leave an option for non-Cipher encryptor types in the future.
        self.iv = iv
        self._encryptor = Cipher(
            algorithm.encryption_algorithm(key),
            algorithm.encryption_mode(self.iv),
            backend=default_backend()
        ).encryptor()

        # associated_data will be authenticated but not encrypted,
        # it must also be passed in on decryption.
        self._encryptor.authenticate_additional_data(associated_data)
__init__.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
        super().__init__()

        self._endpoint = broker
        self._encoding = "utf-8"
        if enc_key == None or enc_iv == None:
            self._transport_enc = False
            self._transport_enc_key = None
            self._transport_enc_iv = None
            self._cipher = None
        else:
            self._transport_enc = True
            self._transport_enc_key = enc_key
            self._transport_enc_iv = enc_iv
            backend = default_backend()
            self._cipher = Cipher(algorithms.AES(
                enc_key), modes.CBC(enc_iv), backend=backend)

        self._session = requests.Session()
        self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
                            'query_data': self.on_query_data, 'send_order': self.on_insert_order,
                            'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
        self.client_id = ''
        self.account_id = ''
keywrap.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    if len(key_to_wrap) < 16:
        raise ValueError("The key to wrap must be at least 16 bytes")

    if len(key_to_wrap) % 8 != 0:
        raise ValueError("The key to wrap must be a multiple of 8 bytes")

    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
    r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            )
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)
keywrap.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
    if len(wrapped_key) < 24:
        raise ValueError("Must be at least 24 bytes")

    if len(wrapped_key) % 8 != 0:
        raise ValueError("The wrapped key must be a multiple of 8 bytes")

    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
    decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
    aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"

    r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
    a = r.pop(0)
    n = len(r)
    for j in reversed(range(6)):
        for i in reversed(range(n)):
            # pack/unpack are safe as these are always 64-bit chunks
            atr = struct.pack(
                ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
            ) + r[i]
            # every decryption operation is a discrete 16 byte chunk so
            # it is safe to reuse the decryptor for the entire operation
            b = decryptor.update(atr)
            a = b[:8]
            r[i] = b[-8:]

    assert decryptor.finalize() == b""

    if not bytes_eq(a, aiv):
        raise InvalidUnwrap()

    return b"".join(r)
keywrap.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    if len(key_to_wrap) < 16:
        raise ValueError("The key to wrap must be at least 16 bytes")

    if len(key_to_wrap) % 8 != 0:
        raise ValueError("The key to wrap must be a multiple of 8 bytes")

    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
    r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            )
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)
keywrap.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def aes_key_unwrap(wrapping_key, wrapped_key, backend):
    if len(wrapped_key) < 24:
        raise ValueError("Must be at least 24 bytes")

    if len(wrapped_key) % 8 != 0:
        raise ValueError("The wrapped key must be a multiple of 8 bytes")

    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
    decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
    aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"

    r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
    a = r.pop(0)
    n = len(r)
    for j in reversed(range(6)):
        for i in reversed(range(n)):
            # pack/unpack are safe as these are always 64-bit chunks
            atr = struct.pack(
                ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
            ) + r[i]
            # every decryption operation is a discrete 16 byte chunk so
            # it is safe to reuse the decryptor for the entire operation
            b = decryptor.update(atr)
            a = b[:8]
            r[i] = b[-8:]

    assert decryptor.finalize() == b""

    if not bytes_eq(a, aiv):
        raise InvalidUnwrap()

    return b"".join(r)
totp.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _cipher_aes_key(value, secret, salt, cost, decrypt=False):
        """
        Internal helper for :meth:`encrypt_key` --
        handles lowlevel encryption/decryption.

        Algorithm details:

        This function uses PBKDF2-HMAC-SHA256 to generate a 32-byte AES key
        and a 16-byte IV from the application secret & random salt.
        It then uses AES-256-CTR to encrypt/decrypt the TOTP key.

        CTR mode was chosen over CBC because the main attack scenario here
        is that the attacker has stolen the database, and is trying to decrypt a TOTP key
        (the plaintext value here).  To make it hard for them, we want every password
        to decrypt to a potentially valid key -- thus need to avoid any authentication
        or padding oracle attacks.  While some random padding construction could be devised
        to make this work for CBC mode, a stream cipher mode is just plain simpler.
        OFB/CFB modes would also work here, but seeing as they have malleability
        and cyclic issues (though remote and barely relevant here),
        CTR was picked as the best overall choice.
        """
        # make sure backend AES support is available
        if _cg_ciphers is None:
            raise RuntimeError("TOTP encryption requires 'cryptography' package "
                               "(https://cryptography.io)")

        # use pbkdf2 to derive both key (32 bytes) & iv (16 bytes)
        # NOTE: this requires 2 sha256 blocks to be calculated.
        keyiv = pbkdf2_hmac("sha256", secret, salt=salt, rounds=(1 << cost), keylen=48)

        # use AES-256-CTR to encrypt/decrypt input value
        cipher = _cg_ciphers.Cipher(_cg_ciphers.algorithms.AES(keyiv[:32]),
                                    _cg_ciphers.modes.CTR(keyiv[32:]),
                                    _cg_default_backend())
        ctx = cipher.decryptor() if decrypt else cipher.encryptor()
        return ctx.update(value) + ctx.finalize()
extract_tokens.py 文件源码 项目:python-miio 作者: rytilahti 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decrypt_ztoken(ztoken):
        """Decrypt the given ztoken, used by apple."""
        if len(ztoken) <= 32:
            return ztoken

        keystring = '00000000000000000000000000000000'
        key = bytes.fromhex(keystring)
        cipher = Cipher(algorithms.AES(key), modes.ECB(),
                        backend=default_backend())
        decryptor = cipher.decryptor()
        token = decryptor.update(bytes.fromhex(ztoken[:64])) \
                + decryptor.finalize()

        return token.decode()
crypto.py 文件源码 项目:pyetesync 作者: etesync 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def decrypt(self, ctext):
        iv = ctext[:AES_BLOCK_SIZE]
        ctext = ctext[AES_BLOCK_SIZE:]
        cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
        unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder()
        decryptor = cipher.decryptor()

        data = decryptor.update(ctext) + decryptor.finalize()
        return unpadder.update(data) + unpadder.finalize()
crypto.py 文件源码 项目:pyetesync 作者: etesync 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def encrypt(self, clear_text):
        iv = os.urandom(AES_BLOCK_SIZE)
        cipher = Cipher(algorithms.AES(self.cipher_key), modes.CBC(iv), backend=default_backend())
        padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder()
        encryptor = cipher.encryptor()
        padded_data = padder.update(clear_text) + padder.finalize()

        return iv + encryptor.update(padded_data) + encryptor.finalize()
keywrap.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def aes_key_wrap(wrapping_key, key_to_wrap, backend):
    if len(wrapping_key) not in [16, 24, 32]:
        raise ValueError("The wrapping key must be a valid AES key length")

    if len(key_to_wrap) < 16:
        raise ValueError("The key to wrap must be at least 16 bytes")

    if len(key_to_wrap) % 8 != 0:
        raise ValueError("The key to wrap must be a multiple of 8 bytes")

    # RFC 3394 Key Wrap - 2.2.1 (index method)
    encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
    a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
    r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
    n = len(r)
    for j in range(6):
        for i in range(n):
            # every encryption operation is a discrete 16 byte chunk (because
            # AES has a 128-bit block size) and since we're using ECB it is
            # safe to reuse the encryptor for the entire operation
            b = encryptor.update(a + r[i])
            # pack/unpack are safe as these are always 64-bit chunks
            a = struct.pack(
                ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
            )
            r[i] = b[-8:]

    assert encryptor.finalize() == b""

    return a + b"".join(r)


问题


面经


文章

微信
公众号

扫码关注公众号