def getHMACFunc(key, hex=True):
"""Return a function that computes the HMAC of its input using the **key**.
:param bool hex: If True, the output of the function will be hex-encoded.
:rtype: callable
:returns: A function which can be uses to generate HMACs.
"""
h = hmac.new(key, digestmod=DIGESTMOD)
def hmac_fn(value):
h_tmp = h.copy()
h_tmp.update(value)
if hex:
return h_tmp.hexdigest()
else:
return h_tmp.digest()
return hmac_fn
python类generate()的实例源码
def generate(bits, progress_func=None):
"""
Generate a new private RSA key. This factory function can be used to
generate a new host key or authentication key.
@param bits: number of bits the generated key should be.
@type bits: int
@param progress_func: an optional function to call at key points in
key generation (used by C{pyCrypto.PublicKey}).
@type progress_func: function
@return: new private key
@rtype: L{RSAKey}
"""
rsa = RSA.generate(bits, rng.read, progress_func)
key = RSAKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
def lambda_handler(event,context):
try:
if event['RequestType'] == 'Create':
# Generate keys
new_key = RSA.generate(2048)
pub_key = new_key.publickey().exportKey(format='OpenSSH')
priv_key = new_key.exportKey()
# Encrypt private key
kms = boto3.client('kms',region_name=event["ResourceProperties"]["Region"])
enc_key = kms.encrypt(KeyId=event["ResourceProperties"]["KMSKey"],Plaintext=priv_key)['CiphertextBlob']
f = open('/tmp/enc_key','wb')
f.write(enc_key)
f.close()
# Upload priivate key to S3
s3 = boto3.client('s3')
s3.upload_file('/tmp/enc_key',event["ResourceProperties"]["KeyBucket"],'enc_key')
else:
pub_key = event['PhysicalResourceId']
cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, pub_key)
except:
traceback.print_exc()
cfnresponse.send(event, context, cfnresponse.FAILED, {}, '')
def generate_key(self, key_size=2048):
"""
Parameters
----------
key_size : int
bits in RSA key
Returns
-------
"""
if key_size % 256 != 0:
raise ValueError('RSA key size must be divisible by 256')
self.key = RSA.generate(key_size) # RSA key
self.RSAcipher = PKCS1_OAEP.new(self.key) # salts/pads things to be encrypted
def generate(bits, progress_func=None):
"""
Generate a new private RSA key. This factory function can be used to
generate a new host key or authentication key.
:param int bits: number of bits the generated key should be.
:param function progress_func:
an optional function to call at key points in key generation (used
by ``pyCrypto.PublicKey``).
:return: new `.RSAKey` private key
"""
rsa = RSA.generate(bits, os.urandom, progress_func)
key = RSAKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
### internals...
def generate_key_pair():
# Random number generator function.
rng = Random.new().read
# Generate a RSA key object by providing PyCrypto with
# desired key size and a randfunc.
# See docs: https://www.dlitz.net/software/pycrypto
# /doc/#crypto-publickey-public-key-algorithms
key = RSA.generate(4096, rng)
# Create and save the private key in the project folder.
# In practice, the private key would be stored locally with
# the master bot and would not be accessible by anyone else.
# However, this is irrelevant for our botnet simulation as everything is local.
f = open('master_bot_private_key.pem', 'wb')
f.write(key.exportKey('PEM').decode('ascii'))
f.close()
# Create and save the public key in the project folder.
# In practice, the public key would be accessible from
# the public server (in this case, the hypothetical pastebot.net).
# However, this is irrelevant for our botnet simulation as everything is local.
f = open('master_bot_public_key.pem', 'wb')
f.write(key.publickey().exportKey('PEM').decode('ascii'))
f.close()
def encrypt_file(filename, pubkey):
# generate a key for symmetric encryption
secretkey = gen_key()
aes = AES.new(secretkey)
contents = open(filename).read()
# pad contents so that the length is a multiple of 16
padding_bytes = 16 - (len(contents) % 16)
contents += chr(padding_bytes) * padding_bytes
# encrypt the file
open(filename + '.enc', 'w').write(aes.encrypt(contents))
# encrypt the secret decryption key
rsa = PKCS1_OAEP.new(pubkey)
open(filename + '.key', 'w').write(rsa.encrypt(secretkey))
# delete the original file
# os.remove(filename)
def generate(bits, progress_func=None):
"""
Generate a new private RSA key. This factory function can be used to
generate a new host key or authentication key.
:param int bits: number of bits the generated key should be.
:param function progress_func:
an optional function to call at key points in key generation (used
by ``pyCrypto.PublicKey``).
:return: new `.RsaKey` private key
"""
rsa = RSA.generate(bits, os.urandom, progress_func)
key = RsaKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
### internals...
def testCoppersmithShortPadAttack():
from Crypto.PublicKey import RSA
import random
import math
import binascii
M = "flag{This_Msg_Is_2_1337}"
M = int(binascii.hexlify(M),_sage_const_16 )
e = _sage_const_3
nBitSize = _sage_const_4096
key = RSA.generate(_sage_const_4096 )
#Give a bit of room, otherwhise the epsilon has to be tiny, and small roots will take forever
m = int(math.floor(nBitSize/(e*e))) - _sage_const_220
assert (m > len(bin(M)[_sage_const_2 :]))
r1 = random.randint(_sage_const_1 ,pow(_sage_const_2 ,m))
r2 = random.randint(r1,pow(_sage_const_2 ,m))
M1 = pow(_sage_const_2 ,m)*M + r1
M2 = pow(_sage_const_2 ,m)*M + r2
C1 = Integer(pow(M1,e,key.n))
C2 = Integer(pow(M2,e,key.n))
CoppersmithShortPadAttack(e,key.n,C1,C2)
def generate(bits, progress_func=None):
"""
Generate a new private RSA key. This factory function can be used to
generate a new host key or authentication key.
:param int bits: number of bits the generated key should be.
:param function progress_func:
an optional function to call at key points in key generation (used
by ``pyCrypto.PublicKey``).
:return: new `.RSAKey` private key
"""
rsa = RSA.generate(bits, os.urandom, progress_func)
key = RSAKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
### internals...
def keypair_create(self):
while True:
name = self.faker.word()
if not self.keeper.get("nova", "keypairs", "name",
lambda x: x == name):
break
key = RSA.generate(2048).publickey().exportKey('OpenSSH')
try:
log.info("Creating keypair with name {}".format(name))
created = self.native.keypairs.create(name=name, public_key=key)
except Exception as exc:
log.critical("Exception: {}".format(exc))
traceback.print_exc()
return
return created
def generate(bits, progress_func=None):
"""
Generate a new private RSA key. This factory function can be used to
generate a new host key or authentication key.
:param int bits: number of bits the generated key should be.
:param function progress_func:
an optional function to call at key points in key generation (used
by ``pyCrypto.PublicKey``).
:return: new `.RSAKey` private key
"""
rsa = RSA.generate(bits, os.urandom, progress_func)
key = RSAKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
### internals...
def sign():
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
key = RSA.generate(2048)
public = key.publickey().exportKey('PEM').decode('ascii')
private = key.exportKey('PEM').decode('ascii')
text = 'abcdefgh'.encode('utf-8')
hash = SHA256.new(text).digest()
signature = key.sign(hash, '')
print('signature=', signature)
# Verify
# Knowing the public key, it is easy to verify a message.
# The plain text is sent to the user along with the signature.
# The receiving side calculates the hash value and then uses the public key verify() method to validate its origin.
def signature():
from son.access.access import AccessClient
from son.workspace.workspace import Workspace
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
ws = Workspace('~/workspace/ws1')
ac = AccessClient(ws)
key = RSA.generate(2048)
# package_path = 'son/access/samples/sonata-demo.son'
package_path = '../samples/sonata-demo.son'
with open(package_path, 'rb') as fhandle:
package_content = fhandle.read()
package_hash = SHA256.new(package_content).digest()
signature = ac.sign_package(package_path,
private_key=key.exportKey().decode('utf-8'))
public_key = key.publickey()
print(public_key.verify(package_hash, (int(signature),)))
def test_initialize_diff_users_same_info(self):
""" Check that two different users can recover the same key for the
same attribute, version, metadata, key length combination.
"""
RSA_key1 = RSA.generate(3072)
RSA_pk1 = RSA_key1.publickey()
RSA_key2 = RSA.generate(3072)
RSA_pk2 = RSA_key2.publickey()
keygen = KeyGen('Sixteen byte key')
keystore = DummyKeyStore()
keygen.initialize_users(
{'user1': (RSA_pk1, [('attr', 1, 'meta', 16)]),
'user2': (RSA_pk2, [('attr', 1, 'meta', 16)])}, keystore)
keywrap1 = keystore.retrieve('user1', 'attr', 1, 'meta')
keywrap2 = keystore.retrieve('user2', 'attr', 1, 'meta')
self.assertEqual(utils.unwrap_key(keywrap1, RSA_key1),
utils.unwrap_key(keywrap2, RSA_key2))
def test_initialize_empty(self):
""" Check that a key generated from empty string inputs, wrapped with a
user's public key, and stored in a keystore can be retrieved and
unwrapped with the user's secret key.
"""
RSA_key = RSA.generate(3072)
RSA_pk = RSA_key.publickey()
keygen = KeyGen('Sixteen byte key')
keystore = DummyKeyStore()
params = [('', 'attr', 1, 'meta'),
('userid', '', 1, 'meta'),
('userid', 'attr', 0, 'meta'),
('userid', 'attr', 1, ''),
('', '', 0, '')]
for userid, attr, vers, meta in params:
orig_key = keygen._generate_key(attr, vers, meta, 16)
keygen.initialize_users(
{userid: (RSA_pk, [(attr, vers, meta, 16)])}, keystore)
keywrap = keystore.retrieve(userid, attr, vers, meta)
self.assertEqual(utils.unwrap_key(keywrap, RSA_key), orig_key)
def test_invalid_enc_dec(self):
""" Check that we cannot unwrap a key using a private key that does
not correspond to the public key used to wrap the key.
"""
RSA_key = RSA.generate(3072)
RSA_pk = RSA_key.publickey()
for i in range(self.num_iters):
sk = format(random.getrandbits(128), 'b')
keywrap = wrap_key(sk, RSA_pk)
other_key = RSA.generate(3072)
try:
decrypted_key = unwrap_key(keywrap, other_key)
except ValueError:
self.assertTrue(True, 'error')
else:
self.assertNotEqual(decrypted_key, sk,
'Decryption succeeded with invalid key')
def get_rsa_keys():
if not (os.path.exists(MANHOLE_SERVER_RSA_PUBLIC) and \
os.path.exists(MANHOLE_SERVER_RSA_PRIVATE)):
# generate a RSA keypair
log.info('generate-rsa-keypair')
from Crypto.PublicKey import RSA
rsa_key = RSA.generate(1024)
public_key_str = rsa_key.publickey().exportKey(format='OpenSSH')
private_key_str = rsa_key.exportKey()
# save keys for next time
file(MANHOLE_SERVER_RSA_PUBLIC, 'w+b').write(public_key_str)
file(MANHOLE_SERVER_RSA_PRIVATE, 'w+b').write(private_key_str)
log.debug('saved-rsa-keypair', public=MANHOLE_SERVER_RSA_PUBLIC,
private=MANHOLE_SERVER_RSA_PRIVATE)
else:
public_key_str = file(MANHOLE_SERVER_RSA_PUBLIC).read()
private_key_str = file(MANHOLE_SERVER_RSA_PRIVATE).read()
return public_key_str, private_key_str
def encrypt_with_ecc(public_ecc_key, message, nonce=None):
"""Takes elliptic curve isntance (public_ecc_key) and a byte string
(message), and outputs a ciphertext
"""
assert isinstance(public_ecc_key, ECC.EccKey),\
"public_ecc_key should be ECC key. Got {}".format(type(public_ecc_key))
random_ecc_key = ECC.generate(curve=public_ecc_key.curve)
new_point = public_ecc_key.pointQ * random_ecc_key.d
h = SHA256.new(str(new_point.x))
h.update('XXX' + str(new_point.y)) # 'XXX' is a delimiter
key = h.digest()
if not nonce:
nonce = os.urandom(16)
aes_engine = AES.new(key=key, mode=AES.MODE_EAX, nonce=nonce)
ctx, tag = aes_engine.encrypt_and_digest(message)
# Return: <ephemeral_pub_key>, <nonce>, <ciphertext>, <tag>
return (random_ecc_key.public_key().export_key(format='OpenSSH'),
aes_engine.nonce, ctx, tag)
def test_03_key_sizes(self):
"""
Ensure that RSA keys which are too small or too large are rejected.
"""
rsa = RSA.generate(getattr(settings, 'SECRETS_MIN_PUBKEY_SIZE', 2048) - 256)
small_key = rsa.publickey().exportKey('PEM')
try:
UserKey(public_key=small_key).clean()
self.fail("UserKey.clean() did not fail with an undersized RSA key")
except ValidationError:
pass
rsa = RSA.generate(4096 + 256) # Max size is 4096 (enforced by master_key_cipher field size)
big_key = rsa.publickey().exportKey('PEM')
try:
UserKey(public_key=big_key).clean()
self.fail("UserKey.clean() did not fail with an oversized RSA key")
except ValidationError:
pass
def list(self, request):
# Determine what size key to generate
key_size = request.GET.get('key_size', 2048)
if key_size not in range(2048, 4097, 256):
key_size = 2048
# Export RSA private and public keys in PEM format
key = RSA.generate(key_size)
private_key = key.exportKey('PEM')
public_key = key.publickey().exportKey('PEM')
return Response({
'private_key': private_key,
'public_key': public_key,
})
def get_form(self, req: HttpRequest, obj: Domain=None, **kwargs: Any) -> type:
if req.GET.get("_prefill_key", "0") == "1":
def formfield_callback(field: _ModelField, request: HttpRequest=None, **kwargs: Any) -> Type[_FormField]:
f = self.formfield_for_dbfield(field, request=request, **kwargs) # type: _FormField
# f can be None if the dbfield does not get a FormField (like hidden fields
# or auto increment IDs). Only the dbfield has a .name attribute.
if f and field.name == "dkimkey":
if obj:
obj.dkimkey = RSA.generate(2048).exportKey("PEM").decode("utf-8")
else:
f.initial = RSA.generate(2048).exportKey("PEM").decode("utf-8")
return f
kwargs["formfield_callback"] = functools.partial(formfield_callback, request=req)
form_t = super().get_form(req, obj, **kwargs)
return form_t
def generate_fingerprint(public_key):
try:
pub_bytes = public_key.encode('utf-8')
# Test that the given public_key string is a proper ssh key. The
# returned object is unused since pyca/cryptography does not have a
# fingerprint method.
serialization.load_ssh_public_key(
pub_bytes, backends.default_backend())
pub_data = base64.b64decode(public_key.split(' ')[1])
digest = hashes.Hash(hashes.MD5(), backends.default_backend())
digest.update(pub_data)
md5hash = digest.finalize()
raw_fp = binascii.hexlify(md5hash)
if six.PY3:
raw_fp = raw_fp.decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except Exception:
raise exception.InvalidKeypair(
reason=_('failed to generate fingerprint'))
def generate_key(bits):
"""Generate a paramiko RSAKey"""
# NOTE(dims): pycryptodome has changed the signature of the RSA.generate
# call. specifically progress_func has been dropped. paramiko still uses
# pycrypto. However some projects like latest pysaml2 have switched from
# pycrypto to pycryptodome as pycrypto seems to have been abandoned.
# paramiko project has started transition to pycryptodome as well but
# there is no release yet with that support. So at the moment depending on
# which version of pysaml2 is installed, Nova is likely to break. So we
# call "RSA.generate(bits)" which works on both pycrypto and pycryptodome
# and then wrap it into a paramiko.RSAKey
rsa = RSA.generate(bits)
key = paramiko.RSAKey(vals=(rsa.e, rsa.n))
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
return key
def setUp(self):
self.set_env_var(CHUNK_SIZE_VAR, 2)
self.private_key = RSA.generate(1024)
private_key_path = self.create_file(self.private_key.exportKey())
self.set_env_var(PRIVATE_KEY_FN, private_key_path)
self.version = '2.0'
self.product = 'a' * 64
self.hardware = 'PowerX'
self.pkg_uid = 'pkg-uid'
content = b'spam'
self.obj_fn = self.create_file(content, name='object')
self.obj_sha256 = hashlib.sha256(content).hexdigest()
self.obj_options = {
'filename': self.obj_fn,
'mode': 'raw',
'target-type': 'device',
'target': '/dev/sda',
}
def test_can_sign_dict(self):
_, fn = tempfile.mkstemp()
self.addCleanup(os.remove, fn)
key = RSA.generate(1024)
with open(fn, 'wb') as fp:
fp.write(key.exportKey())
dict_ = {}
message = SHA256.new(json.dumps(dict_).encode())
result = utils.sign_dict(dict_, fn)
signature = base64.b64decode(result)
verifier = PKCS1_v1_5.new(key)
is_valid = verifier.verify(message, signature)
self.assertTrue(is_valid)
def generate_rsa_private_key():
"""Generate a new RSA private key."""
rand = Random.new().read
return RSA.generate(4096, rand)
def createKey():
random_generator = Random.new().read
return RSA.generate(1024, random_generator)
def generate_OpenSSH(ctx, bits=2048):
'''
Generate an RSA OpenSSH keypair with an exponent of 65537
param: bits The key length in bits
Return private key and public key
'''
private_key = RSA.generate(bits, e=65537)
public_key = private_key.publickey()
str_private_key = private_key.exportKey("PEM")
str_public_key = public_key.exportKey("OpenSSH")
return str_private_key, str_public_key
def setUp(self):
self.rng = Random.new().read
self.key1024 = RSA.generate(1024, self.rng)
# List of tuples with test data for PKCS#1 OAEP
# Each tuple is made up by:
# Item #0: dictionary with RSA key component
# Item #1: plaintext
# Item #2: ciphertext
# Item #3: random data (=seed)
# Item #4: hash object