def __load_private(self, pri_file, cert_pass):
"""??? ??
:param pri_file: ??? ??
:param cert_pass: ??? ????
:return:
"""
# ???/??? ??
with open(pri_file, "rb") as der:
private_bytes = der.read()
try:
self.__peer_pri = serialization.load_der_private_key(private_bytes, cert_pass, default_backend())
except ValueError as e:
logging.exception(f"error {e}")
util.exit_and_msg("Invalid Password")
# ? ? ??
sign = self.sign_data(b'TEST')
if self.verify_data(b'TEST', sign) is False:
util.exit_and_msg("Invalid Signature(Peer Certificate load test)")
python类load_der_private_key()的实例源码
def _load_cryptography_key(cls, data, password=None, backend=None):
backend = default_backend() if backend is None else backend
exceptions = {}
# private key?
for loader in (serialization.load_pem_private_key,
serialization.load_der_private_key):
try:
return loader(data, password, backend)
except (ValueError, TypeError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# public key?
for loader in (serialization.load_pem_public_key,
serialization.load_der_public_key):
try:
return loader(data, backend)
except (ValueError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# no luck
raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
def __call__(self, parser, namespace, values, option_string):
if namespace.key_type == 'raw':
setattr(namespace, self.dest, raw_loader(values))
elif namespace.key_type == 'pem':
setattr(namespace,
self.dest,
serialization.load_pem_private_key(raw_loader(values),
None,
backend))
elif namespace.key_type == 'der':
setattr(namespace,
self.dest,
serialization.load_der_private_key(raw_loader(values),
None,
backend))
def decrypt(self, ctext):
private_key = serialization.load_der_private_key(
self.key_pair.private_key, password=None, backend=default_backend())
return private_key.decrypt(
ctext,
self._padding
)
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def _load_cryptography_key(cls, data, password=None, backend=None):
backend = default_backend() if backend is None else backend
exceptions = {}
# private key?
for loader in (serialization.load_pem_private_key,
serialization.load_der_private_key):
try:
return loader(data, password, backend)
except (ValueError, TypeError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# public key?
for loader in (serialization.load_pem_public_key,
serialization.load_der_public_key):
try:
return loader(data, backend)
except (ValueError,
cryptography.exceptions.UnsupportedAlgorithm) as error:
exceptions[loader] = error
# no luck
raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
def main(argv):
if len(argv) > 0:
password = argv[0].encode()
else:
password = b'test'
private_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
serialized_private = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(password)
)
serialized_public = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
private_key = serialization.load_der_private_key(serialized_private, password, default_backend())
with open('resources/default_pki/private.der', 'wb') as private_file:
private_file.write(serialized_private)
with open('resources/default_pki/public.der', 'wb') as public_file:
public_file.write(serialized_public)
print('save pki in : resources/default_pki/')
def convert_privatekey(self, pri_bytes, password):
"""???? ???? ????? private_key ? ??
??? ??? CA ???/??? None?? ??
:param pri_bytes: ??? bytes
:param password: ??? ??? ????
:return: private_key
"""
try:
return serialization.load_der_private_key(pri_bytes, password, default_backend())
except ValueError:
logging.debug("Invalid Password")
self.__ca_cert = None
self.__ca_pri = None
return None
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def load_rsa_private_key(*names):
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except (ValueError, AssertionError) as e:
raise SSHException(str(e))
self.signing_key = key
self.verifying_key = key.public_key()
curve_class = key.curve.__class__
self.ecdsa_curve = self._ECDSA_CURVES.get_by_curve_class(curve_class)
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except ValueError as e:
raise SSHException(str(e))
assert isinstance(key, rsa.RSAPrivateKey)
self.key = key
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except (ValueError, AssertionError) as e:
raise SSHException(str(e))
self.signing_key = key
self.verifying_key = key.public_key()
curve_class = key.curve.__class__
self.ecdsa_curve = self._ECDSA_CURVES.get_by_curve_class(curve_class)
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except ValueError as e:
raise SSHException(str(e))
assert isinstance(key, rsa.RSAPrivateKey)
self.key = key
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except ValueError as e:
raise SSHException(str(e))
self.signing_key = key
self.verifying_key = key.public_key()
curve_class = key.curve.__class__
self.ecdsa_curve = self._ECDSA_CURVES.get_by_curve_class(curve_class)
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except ValueError as e:
raise SSHException(str(e))
assert isinstance(key, rsa.RSAPrivateKey)
self.key = key
def from_key_bytes(cls, algorithm, key_bytes):
"""Builds a `Signer` from an algorithm suite and a raw signing key.
:param algorithm: Algorithm on which to base signer
:type algorithm: aws_encryption_sdk.identifiers.Algorithm
:param bytes key_bytes: Raw signing key
:rtype: aws_encryption_sdk.internal.crypto.Signer
"""
key = serialization.load_der_private_key(
data=key_bytes,
password=None,
backend=default_backend()
)
return cls(algorithm, key)
def _check_or_add_account(self):
if 'account' in self._config:
account = self._config['account']
with open(account['key'], 'rb') as f:
data = f.read()
if account['key_type'] == 'raw':
key = data
elif account['key_type'] == 'pem':
key = serialization.load_pem_private_key(data, None, backend)
elif account['key_type'] == 'der':
key = serialization.load_der_private_key(data, None, backend)
else:
raise RuntimeError('Unknown key type: ' + account['key_type'])
registration = account['registration']
else:
registration, key = self._add_account()
links, account = self._client.registration(key, registration)
agreement = links.get('terms-of-service', {}).get('url')
if account.get('agreement') != agreement:
self._log('account: accepting {} in 5 seconds...',
agreement,
level=logging.WARN)
time.sleep(5)
self._client.registration(key, registration, agreement)
self._log('account: done')
return key
def _check_or_add_domain_key(self, name, domain):
if 'key' in domain:
with open(domain['key'], 'rb') as f:
data = f.read()
if domain['key_type'] == 'raw':
return data
elif domain['key_type'] == 'pem':
return serialization.load_pem_private_key(data, None, backend)
elif domain['key_type'] == 'der':
return serialization.load_der_private_key(data, None, backend)
else:
raise RuntimeError('Unknown key type: ' + domain['key_type'])
else:
return self._add_domain_key(name, domain, name + '_key')
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except ValueError as e:
raise SSHException(str(e))
self.signing_key = key
self.verifying_key = key.public_key()
curve_class = key.curve.__class__
self.ecdsa_curve = self._ECDSA_CURVES.get_by_curve_class(curve_class)
def _decode_key(self, data):
try:
key = serialization.load_der_private_key(
data, password=None, backend=default_backend()
)
except ValueError as e:
raise SSHException(str(e))
assert isinstance(key, rsa.RSAPrivateKey)
self.key = key