def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''):
self.host = host
self.port = port
try:
self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase)
except IOError:
raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path))
except crypto.Error:
raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path))
try:
self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read())
except IOError:
raise eStreamerCertError("Unable to locate cert file {}".format(cert_path))
except crypto.Error:
raise eStreamerCertError("Invalid certificate {}".format(cert_path))
self.verify = verify
self.ctx = None
self.sock = None
self._bytes = None
python类load_privatekey()的实例源码
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _to_bytes(key)
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password='notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
if key.startswith('-----BEGIN '):
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
else:
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def test_key_only(self):
"""
A L{PKCS12} with only a private key can be exported using
L{PKCS12.export} and loaded again using L{load_pkcs12}.
"""
passwd = 'blah'
p12 = PKCS12()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
p12.set_privatekey(pkey)
self.assertEqual(None, p12.get_certificate())
self.assertEqual(pkey, p12.get_privatekey())
try:
dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
except Error:
# Some versions of OpenSSL will throw an exception
# for this nearly useless PKCS12 we tried to generate:
# [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')]
return
p12 = load_pkcs12(dumped_p12, passwd)
self.assertEqual(None, p12.get_ca_certificates())
self.assertEqual(None, p12.get_certificate())
# OpenSSL fails to bring the key back to us. So sad. Perhaps in the
# future this will be improved.
self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None):
"""
Generate a PKCS12 object with components from PEM. Verify that the set
functions return None.
"""
p12 = PKCS12()
if cert_pem:
ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
self.assertEqual(ret, None)
if key_pem:
ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
self.assertEqual(ret, None)
if ca_pem:
ret = p12.set_ca_certificates((load_certificate(FILETYPE_PEM, ca_pem),))
self.assertEqual(ret, None)
if friendly_name:
ret = p12.set_friendlyname(friendly_name)
self.assertEqual(ret, None)
return p12
def test_replace(self):
"""
L{PKCS12.set_certificate} replaces the certificate in a PKCS12 cluster.
L{PKCS12.set_privatekey} replaces the private key.
L{PKCS12.set_ca_certificates} replaces the CA certificates.
"""
p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
p12.set_ca_certificates([root_cert]) # not a tuple
self.assertEqual(1, len(p12.get_ca_certificates()))
self.assertEqual(root_cert, p12.get_ca_certificates()[0])
p12.set_ca_certificates([client_cert, root_cert])
self.assertEqual(2, len(p12.get_ca_certificates()))
self.assertEqual(client_cert, p12.get_ca_certificates()[0])
self.assertEqual(root_cert, p12.get_ca_certificates()[1])
def test_dump_privatekey_passphraseCallback(self):
"""
L{dump_privatekey} writes an encrypted PEM when given a callback which
returns the correct passphrase.
"""
passphrase = "foo"
called = []
def cb(writing):
called.append(writing)
return passphrase
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
pem = dump_privatekey(FILETYPE_PEM, key, "blowfish", cb)
self.assertTrue(isinstance(pem, str))
self.assertEqual(called, [True])
loadedKey = load_privatekey(FILETYPE_PEM, pem, passphrase)
self.assertTrue(isinstance(loadedKey, PKeyType))
self.assertEqual(loadedKey.type(), key.type())
self.assertEqual(loadedKey.bits(), key.bits())
def _server(self, sock):
"""
Create a new server-side SSL L{Connection} object wrapped around
C{sock}.
"""
# Create the server side Connection. This is mostly setup boilerplate
# - use TLSv1, use a particular certificate, etc.
server_ctx = Context(TLSv1_METHOD)
server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
server_store = server_ctx.get_cert_store()
server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server_ctx.check_privatekey()
server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
# Here the Connection is actually created. If None is passed as the 2nd
# parameter, it indicates a memory BIO should be created.
server_conn = Connection(server_ctx, sock)
server_conn.set_accept_state()
return server_conn
def _client(self, sock):
"""
Create a new client-side SSL L{Connection} object wrapped around
C{sock}.
"""
# Now create the client side Connection. Similar boilerplate to the
# above.
client_ctx = Context(TLSv1_METHOD)
client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
client_store = client_ctx.get_cert_store()
client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
client_ctx.check_privatekey()
client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
client_conn = Connection(client_ctx, sock)
client_conn.set_connect_state()
return client_conn
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _helpers._to_bytes(key)
parsed_pem_key = _helpers._parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _helpers._to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def read_client_identity():
'''Loads the private key and certificate objects as read
from the client identity PEM file. Returns a pair of objects
(key,cert) or None if something bad happened.'''
common.print_info("Loading identity file...")
# Check for missing client identity:
if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH):
common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
return None
# Read and load PKI material from the client identity:
file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r')
file_contents = file_object.read()
file_object.close()
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
except crypto.Error:
common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
cert = None
try:
key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents)
except crypto.Error:
common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
key = None
# Return PKI materials:
return key,cert
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return Verifier(pubkey)
def get_cert_and_key(self):
"""Load cert and key from storage"""
if self._cert and self._key:
return self._cert, self._key
cert_pem, key_pem = self._load_from_storage()
if cert_pem is None:
return None, None
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg="Failed to load client certificate")
return cert, key
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
from OpenSSL import crypto
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
if isinstance(password, six.text_type):
password = password.encode('utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _helpers._to_bytes(key)
parsed_pem_key = _helpers._parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _helpers._to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def test_key_only(self):
"""
A `PKCS12` with only a private key can be exported using
`PKCS12.export` and loaded again using `load_pkcs12`.
"""
passwd = b"blah"
p12 = PKCS12()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
p12.set_privatekey(pkey)
assert None is p12.get_certificate()
assert pkey == p12.get_privatekey()
try:
dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
except Error:
# Some versions of OpenSSL will throw an exception
# for this nearly useless PKCS12 we tried to generate:
# [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')]
return
p12 = load_pkcs12(dumped_p12, passwd)
assert None is p12.get_ca_certificates()
assert None is p12.get_certificate()
# OpenSSL fails to bring the key back to us. So sad. Perhaps in the
# future this will be improved.
assert isinstance(p12.get_privatekey(), (PKey, type(None)))
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None,
friendly_name=None):
"""
Generate a PKCS12 object with components from PEM. Verify that the set
functions return None.
"""
p12 = PKCS12()
if cert_pem:
ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
assert ret is None
if key_pem:
ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
assert ret is None
if ca_pem:
ret = p12.set_ca_certificates(
(load_certificate(FILETYPE_PEM, ca_pem),)
)
assert ret is None
if friendly_name:
ret = p12.set_friendlyname(friendly_name)
assert ret is None
return p12
def test_replace(self):
"""
`PKCS12.set_certificate` replaces the certificate in a PKCS12
cluster. `PKCS12.set_privatekey` replaces the private key.
`PKCS12.set_ca_certificates` replaces the CA certificates.
"""
p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
p12.set_ca_certificates([root_cert]) # not a tuple
assert 1 == len(p12.get_ca_certificates())
assert root_cert == p12.get_ca_certificates()[0]
p12.set_ca_certificates([client_cert, root_cert])
assert 2 == len(p12.get_ca_certificates())
assert client_cert == p12.get_ca_certificates()[0]
assert root_cert == p12.get_ca_certificates()[1]
def test_dump_privatekey_passphrase_callback(self):
"""
`dump_privatekey` writes an encrypted PEM when given a callback
which returns the correct passphrase.
"""
passphrase = b"foo"
called = []
def cb(writing):
called.append(writing)
return passphrase
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
pem = dump_privatekey(FILETYPE_PEM, key, GOOD_CIPHER, cb)
assert isinstance(pem, binary_type)
assert called == [True]
loadedKey = load_privatekey(FILETYPE_PEM, pem, passphrase)
assert isinstance(loadedKey, PKeyType)
assert loadedKey.type() == key.type()
assert loadedKey.bits() == key.bits()
def test_sign_verify_ecdsa(self):
"""
`sign` generates a cryptographic signature which `verify` can check.
ECDSA Signatures in the X9.62 format may have variable length,
different from the length of the private key.
"""
content = (
b"It was a bright cold day in April, and the clocks were striking "
b"thirteen. Winston Smith, his chin nuzzled into his breast in an "
b"effort to escape the vile wind, slipped quickly through the "
b"glass doors of Victory Mansions, though not quickly enough to "
b"prevent a swirl of gritty dust from entering along with him."
).decode("ascii")
priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem)
cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem)
sig = sign(priv_key, content, "sha1")
verify(cert, sig, content, "sha1")
def test_set_verify_callback_exception(self):
"""
If the verify callback passed to `Context.set_verify` raises an
exception, verification fails and the exception is propagated to the
caller of `Connection.do_handshake`.
"""
serverContext = Context(TLSv1_METHOD)
serverContext.use_privatekey(
load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
serverContext.use_certificate(
load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
clientContext = Context(TLSv1_METHOD)
def verify_callback(*args):
raise Exception("silly verify failure")
clientContext.set_verify(VERIFY_PEER, verify_callback)
with pytest.raises(Exception) as exc:
self._handshake_test(serverContext, clientContext)
assert "silly verify failure" == str(exc.value)
def test_accept(self):
"""
`Connection.accept` accepts a pending connection attempt and returns a
tuple of a new `Connection` (the accepted client) and the address the
connection originated from.
"""
ctx = Context(TLSv1_METHOD)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
port = socket()
portSSL = Connection(ctx, port)
portSSL.bind(('', 0))
portSSL.listen(3)
clientSSL = Connection(Context(TLSv1_METHOD), socket())
# Calling portSSL.getsockname() here to get the server IP address
# sounds great, but frequently fails on Windows.
clientSSL.connect(('127.0.0.1', portSSL.getsockname()[1]))
serverSSL, address = portSSL.accept()
assert isinstance(serverSSL, Connection)
assert serverSSL.get_context() is ctx
assert address == clientSSL.getsockname()
def test_shutdown_truncated(self):
"""
If the underlying connection is truncated, `Connection.shutdown`
raises an `Error`.
"""
server_ctx = Context(TLSv1_METHOD)
client_ctx = Context(TLSv1_METHOD)
server_ctx.use_privatekey(
load_privatekey(FILETYPE_PEM, server_key_pem))
server_ctx.use_certificate(
load_certificate(FILETYPE_PEM, server_cert_pem))
server = Connection(server_ctx, None)
client = Connection(client_ctx, None)
handshake_in_memory(client, server)
assert not server.shutdown()
with pytest.raises(WantReadError):
server.shutdown()
server.bio_shutdown()
with pytest.raises(Error):
server.shutdown()