def test_populated(self):
"""
Test when there should be some entries populated in the sqllite DB
"""
# Create a pem encoded public key
private_key = generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
self.keycache.addkeyinfo("https://doesnotexists.com/", "blahstuff", public_key, cache_timer=60)
# Now extract the just inserted key
pubkey = self.keycache.getkeyinfo("https://doesnotexists.com/", "blahstuff")
public_pem2 = pubkey.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
self.assertEqual(public_pem, public_pem2)
# Make sure it errors with urlerror when it should not exist
with self.assertRaises(URLError):
self.keycache.getkeyinfo("https://doesnotexists.com/", "asdf")
python类generate_private_key()的实例源码
def test_cache_update_time(self):
"""
Test if the cache next_update works
"""
# Create a pem encoded public key
private_key = generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
self.keycache.addkeyinfo("https://doesnotexists.com/", "blahstuff", public_key, cache_timer=60, next_update=-1)
# Even though the cache is still valid, the next update is triggered
# We should still get the key, even though the next update fails
# (invalid url)
pubkey = self.keycache.getkeyinfo("https://doesnotexists.com/", "blahstuff")
public_pem2 = pubkey.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
self.assertEqual(public_pem, public_pem2)
def setUp(self):
self._private_key = generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self._public_key = self._private_key.public_key()
self._public_pem = self._public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
keycache = scitokens.utils.keycache.KeyCache.getinstance()
keycache.addkeyinfo("local", "sample_key", self._private_key.public_key())
self._token = scitokens.SciToken(key = self._private_key, key_id="sample_key")
def test_public_key(self):
"""
Test when the public key is provided to deserialize
"""
token = scitokens.SciToken(key = self._private_key)
serialized_token = token.serialize(issuer = "local")
new_token = scitokens.SciToken.deserialize(serialized_token, public_key = self._public_pem, insecure = True)
self.assertIsInstance(new_token, scitokens.SciToken)
# With invalid key
with self.assertRaises(ValueError):
scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = "asdf".encode())
# With a proper key, but not the right one
private_key = generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
with self.assertRaises(DecodeError):
scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = pem)
def generate_rsa_keypair(key_size=2048):
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
public_key = key.public_key()
public = public_key.public_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
private = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return public, private
def create_ca_certificate(cn, key_size=4096, certify_days=365):
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key())
subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
now = datetime.datetime.utcnow()
serial = x509.random_serial_number()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(issuer) \
.public_key(key.public_key()) \
.serial_number(serial) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(key_id, critical=False) \
.add_extension(x509.AuthorityKeyIdentifier(key_id.digest,
[x509.DirectoryName(issuer)],
serial),
critical=False) \
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \
.add_extension(x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False),
critical=True) \
.sign(key, hashes.SHA256(), default_backend())
cert = cert.public_bytes(serialization.Encoding.PEM)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert, key
def create_new_key_pair(self, save_to_config=True):
"""Creates a new public/private key pair and saves them to the config file
:return: Prints out a success message
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
private_key_decoded = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode()
public_key_decoded = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
print('New key pair was created')
if save_to_config:
self.config.set('key_private', private_key_decoded)
self.config.set('key_public', public_key_decoded)
else:
print('\tNew Private Key: %s' % private_key_decoded)
print('\tNew Public Key: %s' % public_key_decoded)
return private_key_decoded, public_key_decoded
def generate_private_key(key_size=4096):
"""Generate a private key.
"""
if key_size < 1024:
raise ValueError("Key is too small!")
return crypto_util.make_key(key_size)
def generate_client_key(key_size=4096, public_exponent=65537):
"""Generate a client key
"""
return acme.jose.JWKRSA(key=rsa.generate_private_key(
public_exponent=public_exponent,
key_size=key_size,
backend=default_backend(),
))
def make_keypair():
return rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend())
def create_privatekey(pkey_file):
""" Create a private key """
pkey = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(pkey_file, 'wb') as f:
f.write(pkey.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
return pkey
def private_key():
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
return rsa.generate_private_key(backend=default_backend(), public_exponent=65537, key_size=2048)
def get_or_gen_key(ctx, account_key_path, new_account_key_size):
account_key_path = os.path.expanduser(account_key_path)
if os.path.exists(account_key_path):
logger.debug('opening existing account key %s', account_key_path)
with open(account_key_path, 'rb') as key_file:
key_contents = key_file.read()
try:
try:
account_key = jose.JWKRSA(key=serialization.load_pem_private_key(key_contents, None,
default_backend()))
except TypeError: # password required
password = click.prompt('Password for %s' % account_key_path, hide_input=True, default=None)
key = serialization.load_pem_private_key(key_contents, password.encode('utf-8'), default_backend())
account_key = jose.JWKRSA(key=key)
except ValueError as e:
logger.error('could not open key %s: %s', account_key_path, e)
ctx.exit(1)
else:
logger.warn('no account key found; creating a new %d bit key in %s', new_account_key_size, account_key_path)
account_key = jose.JWKRSA(key=rsa.generate_private_key(
public_exponent=65537,
key_size=new_account_key_size,
backend=default_backend()))
try:
os.makedirs(os.path.dirname(account_key_path), 0o750)
except os.error:
pass # dir already exists
encryption_algorithm = ask_for_password_or_no_crypto(account_key_path)
with open(account_key_path, 'wb') as key_file:
key_file.write(account_key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption_algorithm
))
return account_key
def generate_primes(key_size=512):
"""
Generate some primes. Key size in bits (minimum 512).
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
private_key = rsa.generate_private_key(public_exponent=65537,key_size=key_size,backend=default_backend())
private_numbers = private_key.private_numbers()
return min(private_numbers.p, private_numbers.q), max(private_numbers.p, private_numbers.q)
def rsa_key():
key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=65537,
key_size=2048
)
return key.public_key().public_bytes(
crypto_serialization.Encoding.OpenSSH,
crypto_serialization.PublicFormat.OpenSSH
).decode('utf-8')
def _create_key(common_name, **kwargs):
app.logger.info('called create_key:\n{0}'.format(pformat(locals())))
key = rsa.generate_private_key(
public_exponent=kwargs.get('public_exponent', CFG.key.public_exponent),
key_size=kwargs.get('key_size', CFG.key.key_size),
backend=default_backend())
return key
def _create_key(common_name, **kwargs):
app.logger.info('called create_key:\n{0}'.format(pformat(locals())))
key = rsa.generate_private_key(
public_exponent=kwargs.get('public_exponent', CFG.key.public_exponent),
key_size=kwargs.get('key_size', CFG.key.key_size),
backend=default_backend())
return key
def _create_key(common_name, **kwargs):
app.logger.info('called create_key:\n{0}'.format(pformat(locals())))
key = rsa.generate_private_key(
public_exponent=kwargs.get('public_exponent', CFG.key.public_exponent),
key_size=kwargs.get('key_size', CFG.key.key_size),
backend=default_backend())
return key
def _create_key(common_name, **kwargs):
app.logger.info('called create_key:\n{0}'.format(pformat(locals())))
key = rsa.generate_private_key(
public_exponent=kwargs.get('public_exponent', CFG.key.public_exponent),
key_size=kwargs.get('key_size', CFG.key.key_size),
backend=default_backend())
return key
def generate_rsa_key(key_size=4096):
privkey = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
return privkey