encryption_util.py 文件源码

python
阅读 44 收藏 0 点赞 0 评论 0

项目:snowflake-connector-python 作者: snowflakedb 项目源码 文件源码
def encrypt_file(encryption_material, in_filename,
                     chunk_size=AES.block_size * 4 * 1024, tmp_dir=None):
        """
        Encrypts a file
        :param s3_metadata: S3 metadata output
        :param encryption_material: encryption material
        :param in_filename: input file name
        :param chunk_size: read chunk size
        :param tmp_dir: temporary directory, optional
        :return: a encrypted file
        """
        logger = getLogger(__name__)
        decoded_key = base64.standard_b64decode(
            encryption_material.query_stage_master_key)
        key_size = len(decoded_key)
        logger.debug(u'key_size = %s', key_size)

        # Generate key for data encryption
        iv_data = SnowflakeEncryptionUtil.get_secure_random(AES.block_size)
        file_key = SnowflakeEncryptionUtil.get_secure_random(key_size)
        data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_data)

        temp_output_fd, temp_output_file = tempfile.mkstemp(
            text=False, dir=tmp_dir,
            prefix=os.path.basename(in_filename) + "#")
        padded = False
        logger.debug(u'unencrypted file: %s, temp file: %s, tmp_dir: %s',
                     in_filename, temp_output_file, tmp_dir)
        with open(in_filename, u'rb') as infile:
            with os.fdopen(temp_output_fd, u'wb') as outfile:
                while True:
                    chunk = infile.read(chunk_size)
                    if len(chunk) == 0:
                        break
                    elif len(chunk) % AES.block_size != 0:
                        chunk = PKCS5_PAD(chunk, AES.block_size)
                        padded = True
                    outfile.write(data_cipher.encrypt(chunk))
                if not padded:
                    outfile.write(data_cipher.encrypt(
                        AES.block_size * chr(AES.block_size).encode(UTF8)))

        # encrypt key with QRMK
        key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB)
        enc_kek = key_cipher.encrypt(PKCS5_PAD(file_key, AES.block_size))

        mat_desc = MaterialDescriptor(
            smk_id=encryption_material.smk_id,
            query_id=encryption_material.query_id,
            key_size=key_size * 8)
        metadata = EncryptionMetadata(
            key=base64.b64encode(enc_kek).decode('utf-8'),
            iv=base64.b64encode(iv_data).decode('utf-8'),
            matdesc=matdesc_to_unicode(mat_desc),
        )
        return (metadata, temp_output_file)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号