def test_key_not_exists(self, pem_path):
"""
When we get the client key and no key file exists, a new key should be
generated and the key should be saved in a key file.
"""
key = maybe_key(pem_path)
pem_file = pem_path.child(u'client.key')
assert_that(pem_file.exists(), Equals(True))
file_key = serialization.load_pem_private_key(
pem_file.getContent(),
password=None,
backend=default_backend()
)
file_key = JWKRSA(key=file_key)
assert_that(key, Equals(file_key))
python类default_backend()的实例源码
def _restore_state(self):
""" Restore user state. """
try:
state = self._state_store.get_value(self._state_store_key)
state_dict = pickle.loads(
binascii.unhexlify(state.encode("utf-8")))
self._name = state_dict['name']
self.enrollment_secret = state_dict['enrollment_secret']
enrollment = state_dict['enrollment']
if enrollment:
private_key = serialization.load_pem_private_key(
enrollment['private_key'],
password=None,
backend=default_backend()
)
cert = enrollment['cert']
self.enrollment = Enrollment(private_key, cert)
self.affiliation = state_dict['affiliation']
self.account = state_dict['account']
self.roles = state_dict['roles']
self._org = state_dict['org']
self.msp_id = state_dict['msp_id']
except Exception as e:
raise IOError("Cannot deserialize the user", e)
def verify_hmac(expected_result, secret_key, unique_prefix, data):
'''
Perform a HMAC using the secret key, unique hash prefix, and data, and
verify that the result of:
HMAC-SHA256(secret_key, unique_prefix | data)
matches the bytes in expected_result.
The key must be kept secret. The prefix ensures hash uniqueness.
Returns True if the signature matches, and False if it does not.
'''
# If the secret key is shorter than the digest size, security is reduced
assert secret_key
assert len(secret_key) >= CryptoHash.digest_size
h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend())
h.update(bytes(unique_prefix))
h.update(bytes(data))
try:
h.verify(bytes(expected_result))
return True
except InvalidSignature:
return False
def encrypt(plaintext: bytes, token: bytes) -> bytes:
"""Encrypt plaintext with a given token.
:param bytes plaintext: Plaintext (json) to encrypt
:param bytes token: Token to use
:return: Encrypted bytes"""
if not isinstance(plaintext, bytes):
raise TypeError("plaintext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
padder = padding.PKCS7(128).padder()
padded_plaintext = padder.update(plaintext) + padder.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(padded_plaintext) + encryptor.finalize()
def decrypt(ciphertext: bytes, token: bytes) -> bytes:
"""Decrypt ciphertext with a given token.
:param bytes ciphertext: Ciphertext to decrypt
:param bytes token: Token to use
:return: Decrypted bytes object"""
if not isinstance(ciphertext, bytes):
raise TypeError("ciphertext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_plaintext = unpadder.update(padded_plaintext)
unpadded_plaintext += unpadder.finalize()
return unpadded_plaintext
def kdf_rfc5869_derive(secret_input_bytes, output_len, m_expand=M_EXPAND_NTOR,
t_key=T_KEY_NTOR):
'''
Return output_len bytes generated from secret_input_bytes using RFC5869
with HKDF-SHA256.
There is no equivalent verification function, as only the nonce part of
the KDF result is verified directly.
See https://gitweb.torproject.org/torspec.git/tree/tor-spec.txt#n1026
'''
hkdf_sha256 = hkdf.HKDF(algorithm=hashes.SHA256(),
length=output_len,
info=bytes(m_expand),
salt=bytes(t_key),
backend=backends.default_backend())
output_bytes = hkdf_sha256.derive(bytes(secret_input_bytes))
assert len(output_bytes) == output_len
return bytearray(output_bytes)
def crypt_create(key_bytes,
is_encrypt_flag=None,
iv_bytes=None,
algorithm=ciphers_algorithms.AES,
mode=ciphers_modes.CTR):
'''
Create and return a crypto context for symmetric key encryption using the
key key_bytes.
Uses algorithm in mode with an IV of iv_bytes.
If iv_bytes is None, an all-zero IV is used.
AES CTR mode uses the same operations for encryption and decyption.
'''
#print "Key: " + binascii.hexlify(bytes(key_bytes))
algo = algorithm(bytes(key_bytes))
# The block_size is in bits
if iv_bytes is None:
iv_bytes = get_zero_pad(algo.block_size/BITS_IN_BYTE)
cipher = ciphers.Cipher(algo, mode(bytes(iv_bytes)),
backend=backends.default_backend())
if is_encrypt_flag:
return cipher.encryptor()
else:
return cipher.decryptor()
def test_rsa(self):
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passphrase)
)
public_key_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
url = 'http://example.com/path?query#fragment'
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
def verify_renewable_cert_sig(renewable_cert):
""" Verifies the signature of a `.storage.RenewableCert` object.
:param `.storage.RenewableCert` renewable_cert: cert to verify
:raises errors.Error: If signature verification fails.
"""
try:
with open(renewable_cert.chain, 'rb') as chain:
chain, _ = pyopenssl_load_certificate(chain.read())
with open(renewable_cert.cert, 'rb') as cert:
cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
hash_name = cert.signature_hash_algorithm.name
OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
except (IOError, ValueError, OpenSSL.crypto.Error) as e:
error_str = "verifying the signature of the cert located at {0} has failed. \
Details: {1}".format(renewable_cert.cert, e)
logger.exception(error_str)
raise errors.Error(error_str)
def load_certificate(self, cert_name):
# Load the raw certificate file data.
path = os.path.join(self.cert_path, cert_name)
with open(path, 'rb') as cert_file:
data = cert_file.read()
# Convert the raw certificate data into a certificate object, first
# as a PEM-encoded certificate and, if that fails, then as a
# DER-encoded certificate. If both fail, the certificate cannot be
# loaded.
try:
return x509.load_pem_x509_certificate(data, default_backend())
except Exception:
try:
return x509.load_der_x509_certificate(data, default_backend())
except Exception:
raise exception.SignatureVerificationError(
"Failed to load certificate: %s" % path
)
def generate_RSA(bits=4096):
new_key = rsa.generate_private_key(
public_exponent=65537,
key_size=bits,
backend=default_backend()
)
private_key = new_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_key = new_key.public_key().public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
return private_key, public_key
def generate_RSA(bits=4096):
new_key = rsa.generate_private_key(
public_exponent=65537,
key_size=bits,
backend=default_backend()
)
private_key = new_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_key = new_key.public_key().public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
return private_key, public_key
def validate_ca_cert(self, ignored):
expected = self._get_expected_ca_cert_fingerprint()
algo, expectedfp = expected.split(':')
expectedfp = expectedfp.replace(' ', '')
backend = default_backend()
with open(self._get_ca_cert_path(), 'r') as f:
certstr = f.read()
cert = load_pem_x509_certificate(certstr, backend)
hasher = getattr(hashes, algo)()
fpbytes = cert.fingerprint(hasher)
fp = binascii.hexlify(fpbytes)
if fp != expectedfp:
os.unlink(self._get_ca_cert_path())
self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
% (fp, expectedfp))
raise NetworkError("The provider's CA fingerprint doesn't match")
def _set_encryptor(self, public_key):
'''Creates a OAEP decryptor based on a RSA private key.'''
if self.encryptor is not None:
return
if not public_key:
try:
with open(self.public_key_file, 'rb') as f:
public_key = f.read()
except IOError as e:
raise errors.InputError(
"public key '%s' does not exist." % self.public_key_file)
try:
public_key_obj = serialization.load_pem_public_key(
public_key,
backend=default_backend())
self.encryptor = Encryptor(public_key_obj)
except Exception as e:
raise errors.EncryptorError(
"public key is malformed. (Reason: %s)" % (str(e)))
def create_csr(key, domains, must_staple=False):
"""
Creates a CSR in DER format for the specified key and domain names.
"""
assert domains
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
])
san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
.add_extension(san, critical=False)
if must_staple:
ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
csr = csr.add_extension(ocsp_must_staple, critical=False)
csr = csr.sign(key, hashes.SHA256(), default_backend())
return export_csr_for_acme(csr)
def _gen_key_initctr(cls, b_password, b_salt):
# 16 for AES 128, 32 for AES256
keylength = 32
# match the size used for counter.new to avoid extra work
ivlength = 16
if HAS_PBKDF2HMAC:
backend = default_backend()
kdf = PBKDF2HMAC(
algorithm=c_SHA256(),
length=2 * keylength + ivlength,
salt=b_salt,
iterations=10000,
backend=backend)
b_derivedkey = kdf.derive(b_password)
else:
b_derivedkey = cls._create_key(b_password, b_salt, keylength, ivlength)
b_key1 = b_derivedkey[:keylength]
b_key2 = b_derivedkey[keylength:(keylength * 2)]
b_iv = b_derivedkey[(keylength * 2):(keylength * 2) + ivlength]
return b_key1, b_key2, hexlify(b_iv)
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def decode_public_key(self, encoded):
"""
Based on spotnab, this is the gzipped version of the key
with base64 applied to it. We decode it and load it.
"""
fileobj = StringIO()
try:
fileobj.write(b64decode(encoded))
except TypeError:
return False
fileobj.seek(0L, SEEK_SET)
self.public_key = None
with GzipFile(fileobj=fileobj, mode="rb") as f:
try:
self.public_key = serialization.load_pem_public_key(
f.read(),
backend=default_backend()
)
except ValueError:
# Could not decrypt content
return False
if not self.public_key:
return False
return True
def genkeys(self, key_size=KeySize.NORMAL, password=None):
"""
Generates a Private and Public Key set and returns them in a tuple
(private, public)
"""
self.private_key = rsa.generate_private_key(
# The public exponent of the new key. Usually one of the small
# Fermat primes 3, 5, 17, 257, 65537. If in doubt you should use
# 65537. See http://www.daemonology.net/blog/2009-06-11-\
# cryptographic-right-answers.html
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
# Generate our Public Key
self.public_key = self.private_key.public_key()
# Store our password; this will be used when we save our content
# via it's searialized value later on
self.password = password
# Returns a (RSAPrivateKey, RSAPublicKey)
return (self.private_key, self.public_key)
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def encrypt_key(key, password):
"""Encrypt the password with the public key and return an ASCII representation.
The public key retrieved from the Travis API is loaded as an RSAPublicKey
object using Cryptography's default backend. Then the given password
is encrypted with the encrypt() method of RSAPublicKey. The encrypted
password is then encoded to base64 and decoded into ASCII in order to
convert the bytes object into a string object.
Parameters
----------
key: str
Travis CI public RSA key that requires deserialization
password: str
the password to be encrypted
Returns
-------
encrypted_password: str
the base64 encoded encrypted password decoded as ASCII
Notes
-----
Travis CI uses the PKCS1v15 padding scheme. While PKCS1v15 is secure,
it is outdated and should be replaced with OAEP.
Example:
OAEP(mgf=MGF1(algorithm=SHA256()), algorithm=SHA256(), label=None))
"""
public_key = load_pem_public_key(key.encode(), default_backend())
encrypted_password = public_key.encrypt(password, PKCS1v15())
return base64.b64encode(encrypted_password).decode('ascii')
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def maybe_key(pem_path):
"""
Set up a client key if one does not exist already.
https://gist.github.com/glyph/27867a478bb71d8b6046fbfb176e1a33#file-local-certs-py-L32-L50
:type pem_path: twisted.python.filepath.FilePath
:param pem_path:
The path to the certificate directory to use.
"""
acme_key_file = pem_path.child(u'client.key')
if acme_key_file.exists():
key = serialization.load_pem_private_key(
acme_key_file.getContent(),
password=None,
backend=default_backend()
)
else:
key = generate_private_key(u'rsa')
acme_key_file.setContent(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
)
return jose.JWKRSA(key=key)
def generate_wildcard_pem_bytes():
"""
Generate a wildcard (subject name '*') self-signed certificate valid for
10 years.
https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
:return: Bytes representation of the PEM certificate data
"""
key = generate_private_key(u'rsa')
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')])
cert = (
x509.CertificateBuilder()
.issuer_name(name)
.subject_name(name)
.not_valid_before(datetime.today() - timedelta(days=1))
.not_valid_after(datetime.now() + timedelta(days=3650))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return b''.join((
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()),
cert.public_bytes(serialization.Encoding.PEM)
))
def get_peer_org_user(client, peer_org, user='Admin'):
"""Loads the requested user for a given peer org
and returns a user object.
"""
peer_user_base_path = os.path.join(
os.getcwd(),
'test/fixtures/e2e_cli/crypto-config/peerOrganizations/{0}'
'/users/{1}@{0}/msp/'.format(peer_org, user)
)
key_path = os.path.join(
peer_user_base_path,
'keystore/',
E2E_CONFIG['test-network'][peer_org]['users'][user]['private_key']
)
cert_path = os.path.join(
peer_user_base_path,
'signcerts/',
E2E_CONFIG['test-network'][peer_org]['users'][user]['cert']
)
with open(key_path, 'rb') as key:
key_pem = key.read()
with open(cert_path, 'rb') as cert:
cert_pem = cert.read()
org_user = User('peer' + peer_org + user, peer_org, client.state_store)
# wrap the key in a 'cryptography' private key object
# so that all the methods can be used
private_key = load_pem_private_key(key_pem, None, default_backend())
enrollment = Enrollment(private_key, cert_pem)
org_user.enrollment = enrollment
org_user.msp_id = E2E_CONFIG['test-network'][peer_org]['mspid']
return org_user
def append_signature(target_file, priv_key, priv_key_password=None):
"""Append signature to the end of file"""
with open(target_file, "rb") as f:
contents = f.read()
with open(priv_key, "rb") as kf:
private_key = serialization.load_pem_private_key(
kf.read(), password=priv_key_password, backend=default_backend())
signature = private_key.sign(contents, padding.PKCS1v15(), hashes.SHA512())
with open(target_file, "ab") as f:
f.write(signature)
def load_key(pubkey):
"""Load public RSA key.
Work around keys with incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())