def _generate_n_random_shares(n, l):
'''
Arguments:
n - the number of random shares to generate
l - the upper length of the shares to generate (in bytes)
Returns: A list of n random numbers of length l,
they are generated using a cryptographic PRNG.
'''
return [Random.get_random_bytes(l) for x in xrange(0, n)]
python类get_random_bytes()的实例源码
def test_optimal_path(self):
'''
Test the optimal path functionality
'''
expressions = [('a&b',['a','b'],'a&b'),
('a&b&c', ['a','b'],''),
('a|b', ['a'], 'a'),
('a|b|c',['a','b'], 'a'),
('(a&b)|c',['a','b'], '(a&b)'),
('(a&b)|c',['c'], 'c'),
('(a|b)&c',['a','c'], '(a)&c'),
('(a|b)&c',['a'], ''),
('(a&b)|(b&c)',['b','c'],'(b&c)'),
('(a|b)&(c|d)',['a','d'],'(a)&(d)'),
('((a&b)|c)&(d|e)',['c','e'],'(c)&(e)')]
parser = VisParser()
share_parser = SecretVisParser()
for (e, t, g) in expressions:
vis_tree = parser.parse(e)
share_tree = SecretVisTree(vis_tree.root,
vis_tree.expression,
secret=Random.get_random_bytes(16))
share_tree.compute_shares()
share_tree.set_attributes(vis_tree)
(match, opt_tree, keys) = share_tree.optimal_decryption_tree(Keytor('VIS_AES_CBC',DummyKeys(terms=t),16),
encrypted=False)
self.assertEqual(g, opt_tree.__str__(),
"Optimal tree for %s: %s should be %s" %
(e,opt_tree.__str__(),g))
def test_compute_shares(self):
'''
Tests computing the shares
'''
expressions = ['a&b',
'a&b&c',
'a|b',
'a|b|c',
'(a&b)|c',
'(a|b)&c',
'a|(b&c)',
'a&(b|c)',
'(a&b)|(b&c)',
'(a|b)&(c|d)',
'"test&|"&b',
'((a&b)|c)&(d|e)']
parser = VisParser()
for e in expressions:
tree = parser.parse(e)
secret = Random.get_random_bytes(16)
secret_tree = SecretVisTree(tree.root, e, secret=secret)
secret_tree.compute_shares()
#Test for correct share
self.assertTrue(secret_tree.verify_shares(),
"Shares %s did not verify for expression %s." %
(secret_tree.print_shares(), e))
#Test for incorrect share
secret_tree.root.share = 0
self.assertFalse(secret_tree.verify_shares(),
"Shares %s did incorrectly verify for expression %s." %
(secret_tree.print_shares(), e))
def test_encrypt_decrypt(self):
'''
Tests encrypting and decrypting the shares
'''
DummyPKI = DummyEncryptionPKI()
expressions = ['a&b',
'a&b&c',
'a|b',
'a|b|c',
'(a&b)|c',
'(a|b)&c',
'a|(b&c)',
'a&(b|c)',
'(a&b)|(b&c)',
'(a|b)&(c|d)']
for e in expressions:
secret = Random.get_random_bytes(16)
encrypted_shares = SecretVisTreeEncryptor.encrypt_secret_shares(e,
secret,
Keytor('VIS_AES_CBC',DummyPKI,16),
Pycrypto_AES_CBC)
share = SecretVisTreeEncryptor.decrypt_secret_shares(e,
encrypted_shares,
Keytor('VIS_AES_CBC',DummyPKI,16),
Pycrypto_AES_CBC)
self.assertEqual(share, secret)
def encrypt(plaintext, key=config.SECRET, key_salt='', no_iv=False):
"""Encrypt shit the right way"""
# sanitize inputs
key = SHA256.new(key + key_salt).digest()
if len(key) not in AES.key_size:
raise Exception()
if isinstance(plaintext, unicode):
plaintext = plaintext.encode('utf-8')
# pad plaintext using PKCS7 padding scheme
padlen = AES.block_size - len(plaintext) % AES.block_size
plaintext += chr(padlen) * padlen
# generate random initialization vector using CSPRNG
iv = '\0' * AES.block_size if no_iv else get_random_bytes(AES.block_size)
# encrypt using AES in CFB mode
ciphertext = AES.new(key, AES.MODE_CFB, iv).encrypt(plaintext)
# prepend iv to ciphertext
if not no_iv:
ciphertext = iv + ciphertext
# return ciphertext in hex encoding
return ciphertext.encode('hex')
def __init__(self, module, params):
from Crypto import Random
unittest.TestCase.__init__(self)
self.module = module
self.iv = Random.get_random_bytes(module.block_size)
self.key = b(params['key'])
self.plaintext = 100 * b(params['plaintext'])
self.module_name = params.get('module_name', None)
def generate_iv(self):
"""
Generate a random initialization vector. If pycrypto is not available,
return a buffer of the correct length filled with only '\x00'.
"""
if Random:
return Random.get_random_bytes(self.iv_size)
else:
return chr(0) * self.iv_size
def __init__(self, key_aes):
if key_aes==None: # if key_aes is empty generate random key 16 byte / 128 bit in length for the AES cipher key
self.key_aes = Random.get_random_bytes(16)
self.dump_aes_key(self.key_aes)
if key_aes != None:
self.key_aes =self.hash_sha256(key_aes)
if len(self.hash_sha256(key_aes)) != self.block_size:
print "Aes key size missmatch !"
return None
# the function generates encrypted files with the extension
# *.enc encrypted files are stored in the specified format
# below {[file size in bytes][IV][Cipher blocks]}.enc
def _encrypt(self, dec, password=None):
"""
Internal encryption function
Uses either the password argument for the encryption,
or, if not supplied, the password field of the object
:param dec: a byte string representing the to be encrypted data
:rtype: bytes
"""
if AES is None:
raise ImportError("PyCrypto required")
if password is None:
password = self.password
if password is None:
raise ValueError(
"Password need to be provided to create encrypted archives")
# generate the different encryption parts (non-secure!)
master_key = Random.get_random_bytes(32)
master_salt = Random.get_random_bytes(64)
user_salt = Random.get_random_bytes(64)
master_iv = Random.get_random_bytes(16)
user_iv = Random.get_random_bytes(16)
rounds = 10000
# create the PKCS#7 padding
l = len(dec)
pad = 16 - (l % 16)
dec += bytes([pad] * pad)
# encrypt the data
cipher = AES.new(master_key, IV=master_iv, mode=AES.MODE_CBC)
enc = cipher.encrypt(dec)
# generate the master key checksum
master_ck = PBKDF2(self.encode_utf8(master_key),
master_salt, dkLen=256//8, count=rounds)
# generate the user key from the given password
user_key = PBKDF2(password,
user_salt, dkLen=256//8, count=rounds)
# encrypt the master key and iv
master_dec = b"\x10" + master_iv + b"\x20" + master_key + b"\x20" + master_ck
l = len(master_dec)
pad = 16 - (l % 16)
master_dec += bytes([pad] * pad)
cipher = AES.new(user_key, IV=user_iv, mode=AES.MODE_CBC)
master_enc = cipher.encrypt(master_dec)
# put everything together
enc = binascii.b2a_hex(user_salt).upper() + b"\n" + \
binascii.b2a_hex(master_salt).upper() + b"\n" + \
str(rounds).encode() + b"\n" + \
binascii.b2a_hex(user_iv).upper() + b"\n" + \
binascii.b2a_hex(master_enc).upper() + b"\n" + enc
return enc