def test_key_not_exists(self, pem_path):
"""
When we get the client key and no key file exists, a new key should be
generated and the key should be saved in a key file.
"""
key = maybe_key(pem_path)
pem_file = pem_path.child(u'client.key')
assert_that(pem_file.exists(), Equals(True))
file_key = serialization.load_pem_private_key(
pem_file.getContent(),
password=None,
backend=default_backend()
)
file_key = JWKRSA(key=file_key)
assert_that(key, Equals(file_key))
python类load_pem_private_key()的实例源码
def _restore_state(self):
""" Restore user state. """
try:
state = self._state_store.get_value(self._state_store_key)
state_dict = pickle.loads(
binascii.unhexlify(state.encode("utf-8")))
self._name = state_dict['name']
self.enrollment_secret = state_dict['enrollment_secret']
enrollment = state_dict['enrollment']
if enrollment:
private_key = serialization.load_pem_private_key(
enrollment['private_key'],
password=None,
backend=default_backend()
)
cert = enrollment['cert']
self.enrollment = Enrollment(private_key, cert)
self.affiliation = state_dict['affiliation']
self.account = state_dict['account']
self.roles = state_dict['roles']
self._org = state_dict['org']
self.msp_id = state_dict['msp_id']
except Exception as e:
raise IOError("Cannot deserialize the user", e)
def read_file(self, subject_name, pw=None):
"""
???? ???
:param subject_name: ??? ??? ??? ??
:return: ??? ???
"""
pem_path = os.path.join(os.path.dirname(__file__),
"../../resources/unittest/" + subject_name + "/cert.pem")
f = open(pem_path, "rb")
cert_pem = f.read()
f.close()
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
key_path = os.path.join(os.path.dirname(__file__),
"../../resources/unittest/" + subject_name + "/key.pem")
f = open(key_path, "rb")
cert_key = f.read()
f.close()
private_key = serialization.load_pem_private_key(cert_key, pw, default_backend())
return {'cert': cert, 'private_key': private_key}
def load_pki(self, cert_path: str, cert_pass=None):
"""
??? ??
:param cert_path: ??? ??
:param cert_pass: ??? ????
"""
ca_cert_file = join(cert_path, self.CERT_NAME)
ca_pri_file = join(cert_path, self.PRI_NAME)
# ???/??? ??
with open(ca_cert_file, "rb") as der:
cert_bytes = der.read()
self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
with open(ca_pri_file, "rb") as der:
private_bytes = der.read()
try:
self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend())
except ValueError:
logging.debug("Invalid Password")
# ??? ? ? ??
sign = self.sign_data(b'TEST')
if self.verify_data(b'TEST', sign) is False:
logging.debug("Invalid Signature(Root Certificate load test)")
def create_decryptor(private_location, public_location):
try:
with open(private_location, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
except FileNotFoundError:
with open(private_location, "wb") as key_file:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
with open(public_location, "wb") as public_file:
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
public_file.write(pem)
def decrypt(ciphertext):
return private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
return decrypt
def _load_cryptography_key(cls, data, password=None, backend=None):
backend = default_backend() if backend is None else backend
exceptions = {}
# private key?
for loader in (serialization.load_pem_private_key,
serialization.load_der_private_key):
try:
return loader(data, password, backend)
except (ValueError, TypeError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# public key?
for loader in (serialization.load_pem_public_key,
serialization.load_der_public_key):
try:
return loader(data, backend)
except (ValueError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# no luck
raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
def get_channel(self):
#first get a token we need to sign in order to prove we are who we say we are
r = requests.get(str(self.base_link_uri) + "/get_device_token", params={"UUID": self.uuid, })
token = r.json()["token"]
# get the private Key
with open(CloudLinkSettings.PRIVATE_KEY_LOCATION,'r') as key_file:
private_key = serialization.load_pem_private_key(key_file.read(),
password=CloudLinkSettings.PRIVATE_KEY_PASSPHRASE,
backend=default_backend())
# sign the token with our private key
signer = private_key.signer(padding.PKCS1v15(), hashes.SHA256())
signer.update(bytes(token))
signature = signer.finalize()
# get the randomly assigned channel for my UUID
r = requests.get(str(self.base_link_uri) + "/get_device_group",
params={"UUID": self.uuid, "signature": b64encode(signature), "format": "PKCS1_v1_5"})
if r.ok:
self.channel_uri = r.json()["channel"]
elif r.status_code == 400:
raise Exception("UUID or Token not registered with Cloud.")
elif r.status_code == 403:
raise Exception("Signature didn't verify correctly. Bad private key or signature.")
def decode_pem_key(key_pem):
"""Convert plaintext PEM key into the format usable for JWT generation
Args:
key_pam (str): key data in PEM format, presented as plain string
Returns:
Parsed PEM data
"""
private_key = serialization.load_pem_private_key(
data=key_pem.encode('ascii'),
password=None,
backend=default_backend())
msg = 'Unexpected private key type'
assert isinstance(private_key, rsa.RSAPrivateKey), msg
assert private_key.key_size >= 2048, 'RSA key size too small'
return private_key
def load_private_key(privkey_file):
'''
Loads the private key stored in the file indicated by the privkey_file
parameter and returns it
The `privkey_file` parameter indicates the absolute path to the file
storing the private key.
The key returned is a RSAPrivateKey instance.
'''
with open(privkey_file, "rb") as key_file:
privkey = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
return privkey
def rsa_sign(private_key, message):
"""Signs a message with the provided Rsa private key.
Args:
private_key (str): Rsa private key with BEGIN and END sections.
message (str): Message to be hashed and signed.
Returns:
str: Hex signature of the message, with its leading 0x stripped.
"""
private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend())
hashed = sha256(message)
signature = private_rsa.sign(
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return binascii.hexlify(bytearray(signature))
def load_or_create_client_key(pem_path):
"""
Load the client key from a directory, creating it if it does not exist.
.. note:: The client key that will be created will be a 2048-bit RSA key.
:type pem_path: ``twisted.python.filepath.FilePath``
:param pem_path: The certificate directory
to use, as with the endpoint.
"""
acme_key_file = pem_path.asTextMode().child(u'client.key')
if acme_key_file.exists():
key = serialization.load_pem_private_key(
acme_key_file.getContent(),
password=None,
backend=default_backend())
else:
key = generate_private_key(u'rsa')
acme_key_file.setContent(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))
return JWKRSA(key=key)
validator_reg_message_factory.py 文件源码
项目:sawtooth-core
作者: hyperledger
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def __init__(self, signer):
self._factory = MessageFactory(
family_name="sawtooth_validator_registry",
family_version="1.0",
namespace="6a4372",
signer=signer
)
self.public_key_hash = hashlib.sha256(
signer.get_public_key().as_hex().encode()).hexdigest()
self._report_private_key = \
serialization.load_pem_private_key(
self.__REPORT_PRIVATE_KEY_PEM__.encode(),
password=None,
backend=backends.default_backend())
# First we need to create a public/private key pair for the PoET
# enclave to use.
context = create_context('secp256k1')
self._poet_private_key = Secp256k1PrivateKey.from_hex(
"1f70fa2518077ad18483f48e77882d11983b537fa5f7cf158684d2c670fe4f1f")
self.poet_public_key = context.get_public_key(self._poet_private_key)
test_f_aws_encryption_sdk_client.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _mgf1_sha256_supported():
wk = serialization.load_pem_private_key(
data=VALUES['raw'][b'asym1'][EncryptionKeyType.PRIVATE],
password=None,
backend=default_backend()
)
try:
wk.public_key().encrypt(
plaintext=b'aosdjfoiajfoiaj;foijae;rogijaerg',
padding=padding.OAEP(
mgf=padding.MGF1(hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
except cryptography.exceptions.UnsupportedAlgorithm:
return False
return True
def __init__(self, wrapping_algorithm, wrapping_key, wrapping_key_type, password=None):
"""Prepares initial values."""
self.wrapping_algorithm = wrapping_algorithm
self.wrapping_key_type = wrapping_key_type
if wrapping_key_type is EncryptionKeyType.PRIVATE:
self._wrapping_key = serialization.load_pem_private_key(
data=wrapping_key,
password=password,
backend=default_backend()
)
elif wrapping_key_type is EncryptionKeyType.PUBLIC:
self._wrapping_key = serialization.load_pem_public_key(
data=wrapping_key,
backend=default_backend()
)
elif wrapping_key_type is EncryptionKeyType.SYMMETRIC:
self._wrapping_key = wrapping_key
self._derived_wrapping_key = derive_data_encryption_key(
source_key=self._wrapping_key,
algorithm=self.wrapping_algorithm.algorithm,
message_id=None
)
else:
raise InvalidDataKeyError('Invalid wrapping_key_type: {}'.format(wrapping_key_type))
def __call__(self, parser, namespace, values, option_string):
if namespace.key_type == 'raw':
setattr(namespace, self.dest, raw_loader(values))
elif namespace.key_type == 'pem':
setattr(namespace,
self.dest,
serialization.load_pem_private_key(raw_loader(values),
None,
backend))
elif namespace.key_type == 'der':
setattr(namespace,
self.dest,
serialization.load_der_private_key(raw_loader(values),
None,
backend))
def __init__(self, pem_private_key, private_key_password=None):
"""
RSA Certificate Authority used to sign certificates.
:param pem_private_key: PEM formatted RSA Private Key. It should be encrypted with a
password, but that is not required.
:param private_key_password: Password to decrypt the PEM RSA Private Key, if it is
encrypted. Which it should be.
"""
super(SSHCertificateAuthority, self).__init__()
self.public_key_type = SSHPublicKeyType.RSA
self.private_key = load_pem_private_key(pem_private_key,
private_key_password,
default_backend())
self.signer = self.private_key.signer(padding.PKCS1v15(),
hashes.SHA1())
ca_pub_numbers = self.private_key.public_key().public_numbers()
self.e = ca_pub_numbers.e
self.n = ca_pub_numbers.n
def maybe_key(pem_path):
"""
Set up a client key if one does not exist already.
https://gist.github.com/glyph/27867a478bb71d8b6046fbfb176e1a33#file-local-certs-py-L32-L50
:type pem_path: twisted.python.filepath.FilePath
:param pem_path:
The path to the certificate directory to use.
"""
acme_key_file = pem_path.child(u'client.key')
if acme_key_file.exists():
key = serialization.load_pem_private_key(
acme_key_file.getContent(),
password=None,
backend=default_backend()
)
else:
key = generate_private_key(u'rsa')
acme_key_file.setContent(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
)
return jose.JWKRSA(key=key)
def get_peer_org_user(client, peer_org, user='Admin'):
"""Loads the requested user for a given peer org
and returns a user object.
"""
peer_user_base_path = os.path.join(
os.getcwd(),
'test/fixtures/e2e_cli/crypto-config/peerOrganizations/{0}'
'/users/{1}@{0}/msp/'.format(peer_org, user)
)
key_path = os.path.join(
peer_user_base_path,
'keystore/',
E2E_CONFIG['test-network'][peer_org]['users'][user]['private_key']
)
cert_path = os.path.join(
peer_user_base_path,
'signcerts/',
E2E_CONFIG['test-network'][peer_org]['users'][user]['cert']
)
with open(key_path, 'rb') as key:
key_pem = key.read()
with open(cert_path, 'rb') as cert:
cert_pem = cert.read()
org_user = User('peer' + peer_org + user, peer_org, client.state_store)
# wrap the key in a 'cryptography' private key object
# so that all the methods can be used
private_key = load_pem_private_key(key_pem, None, default_backend())
enrollment = Enrollment(private_key, cert_pem)
org_user.enrollment = enrollment
org_user.msp_id = E2E_CONFIG['test-network'][peer_org]['mspid']
return org_user
def append_signature(target_file, priv_key, priv_key_password=None):
"""Append signature to the end of file"""
with open(target_file, "rb") as f:
contents = f.read()
with open(priv_key, "rb") as kf:
private_key = serialization.load_pem_private_key(
kf.read(), password=priv_key_password, backend=default_backend())
signature = private_key.sign(contents, padding.PKCS1v15(), hashes.SHA512())
with open(target_file, "ab") as f:
f.write(signature)
def load_private_key_string(key_string):
return serialization.load_pem_private_key(key_string, password=None, backend=default_backend())
def get_private_key_and_key_id(issuer, key_id=None):
file_name = get_key_file_name(keys=api_settings.PRIVATE_KEYS, issuer=issuer, key_id=key_id)
file_data = read_key_file(file_name=file_name)
key = load_pem_private_key(file_data, password=None, backend=default_backend())
return key, get_key_id(file_name=file_name)
def load_key(filename):
with open(filename, 'rb') as pem_in:
pemlines = pem_in.read()
private_key = load_pem_private_key(pemlines, None, default_backend())
return private_key
def convert_private_key_to_pem(private_key):
private_key_bytes = private_key
if not isinstance(private_key_bytes, bytes):
private_key_bytes = private_key.encode()
return serialization.load_pem_private_key(
private_key_bytes,
password=None,
backend=default_backend()
)
def convert_privkey_to_pem(key):
key_bytes = ApiClient.convert_to_bytes(key)
return serialization.load_pem_private_key(
key_bytes,
password=None,
backend=default_backend()
)
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def _load_cryptography_key(cls, data, password=None, backend=None):
backend = default_backend() if backend is None else backend
exceptions = {}
# private key?
for loader in (serialization.load_pem_private_key,
serialization.load_der_private_key):
try:
return loader(data, password, backend)
except (ValueError, TypeError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# public key?
for loader in (serialization.load_pem_public_key,
serialization.load_der_public_key):
try:
return loader(data, backend)
except (ValueError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# no luck
raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
def from_pem(cls, key_data, passphrase=None):
private_key = serialization.load_pem_private_key(
key_data,
passphrase,
default_backend()
)
return cls(private_key)
def from_pem(cls, key_data, passphrase=None):
private_key = serialization.load_pem_private_key(
key_data,
passphrase,
default_backend()
)
return cls(private_key)
def get_key(self):
password = settings.ACCOUNT_KEY_PASSWORD.encode()
if not self.key:
self.set_key()
self.save()
return serialization.load_pem_private_key(self.key.encode(), password=password, backend=default_backend())