def __new__(typ, value):
if isinstance(value, basestring):
path = process.config_file(value)
if path is None:
log.warn("Certificate file '%s' is not readable" % value)
return None
try:
f = open(path, 'rt')
except:
log.warn("Certificate file '%s' could not be open" % value)
return None
try:
try:
return crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
except crypto.Error, e:
log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
return None
finally:
f.close()
else:
raise TypeError, 'value should be a string'
python类load_certificate()的实例源码
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''):
self.host = host
self.port = port
try:
self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase)
except IOError:
raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path))
except crypto.Error:
raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path))
try:
self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read())
except IOError:
raise eStreamerCertError("Unable to locate cert file {}".format(cert_path))
except crypto.Error:
raise eStreamerCertError("Invalid certificate {}".format(cert_path))
self.verify = verify
self.ctx = None
self.sock = None
self._bytes = None
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None):
"""
Generate a PKCS12 object with components from PEM. Verify that the set
functions return None.
"""
p12 = PKCS12()
if cert_pem:
ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
self.assertEqual(ret, None)
if key_pem:
ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
self.assertEqual(ret, None)
if ca_pem:
ret = p12.set_ca_certificates((load_certificate(FILETYPE_PEM, ca_pem),))
self.assertEqual(ret, None)
if friendly_name:
ret = p12.set_friendlyname(friendly_name)
self.assertEqual(ret, None)
return p12
def test_replace(self):
"""
L{PKCS12.set_certificate} replaces the certificate in a PKCS12 cluster.
L{PKCS12.set_privatekey} replaces the private key.
L{PKCS12.set_ca_certificates} replaces the CA certificates.
"""
p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
p12.set_ca_certificates([root_cert]) # not a tuple
self.assertEqual(1, len(p12.get_ca_certificates()))
self.assertEqual(root_cert, p12.get_ca_certificates()[0])
p12.set_ca_certificates([client_cert, root_cert])
self.assertEqual(2, len(p12.get_ca_certificates()))
self.assertEqual(client_cert, p12.get_ca_certificates()[0])
self.assertEqual(root_cert, p12.get_ca_certificates()[1])
def _server(self, sock):
"""
Create a new server-side SSL L{Connection} object wrapped around
C{sock}.
"""
# Create the server side Connection. This is mostly setup boilerplate
# - use TLSv1, use a particular certificate, etc.
server_ctx = Context(TLSv1_METHOD)
server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
server_store = server_ctx.get_cert_store()
server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server_ctx.check_privatekey()
server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
# Here the Connection is actually created. If None is passed as the 2nd
# parameter, it indicates a memory BIO should be created.
server_conn = Connection(server_ctx, sock)
server_conn.set_accept_state()
return server_conn
def _client(self, sock):
"""
Create a new client-side SSL L{Connection} object wrapped around
C{sock}.
"""
# Now create the client side Connection. Similar boilerplate to the
# above.
client_ctx = Context(TLSv1_METHOD)
client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
client_store = client_ctx.get_cert_store()
client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
client_ctx.check_privatekey()
client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
client_conn = Connection(client_ctx, sock)
client_conn.set_connect_state()
return client_conn
def test_set_multiple_ca_list(self):
"""
If passed a list containing multiple X509Name objects,
L{Context.set_client_ca_list} configures the context to send those CA
names to the client and, on both the server and client sides,
L{Connection.get_client_ca_list} returns a list containing those
X509Names after the connection is set up.
"""
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
sedesc = secert.get_subject()
cldesc = clcert.get_subject()
def multiple_ca(ctx):
L = [sedesc, cldesc]
ctx.set_client_ca_list(L)
return L
self._check_client_ca_list(multiple_ca)
def test_reset_ca_list(self):
"""
If called multiple times, only the X509Names passed to the final call
of L{Context.set_client_ca_list} are used to configure the CA names
sent to the client.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
cadesc = cacert.get_subject()
sedesc = secert.get_subject()
cldesc = clcert.get_subject()
def changed_ca(ctx):
ctx.set_client_ca_list([sedesc, cldesc])
ctx.set_client_ca_list([cadesc])
return [cadesc]
self._check_client_ca_list(changed_ca)
def test_mutated_ca_list(self):
"""
If the list passed to L{Context.set_client_ca_list} is mutated
afterwards, this does not affect the list of CA names sent to the
client.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
cadesc = cacert.get_subject()
sedesc = secert.get_subject()
def mutated_ca(ctx):
L = [cadesc]
ctx.set_client_ca_list([cadesc])
L.append(sedesc)
return [cadesc]
self._check_client_ca_list(mutated_ca)
def test_set_and_add_client_ca(self):
"""
A call to L{Context.set_client_ca_list} followed by a call to
L{Context.add_client_ca} results in using the CA names from the first
call and the CA name from the second call.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
cadesc = cacert.get_subject()
sedesc = secert.get_subject()
cldesc = clcert.get_subject()
def mixed_set_add_ca(ctx):
ctx.set_client_ca_list([cadesc, sedesc])
ctx.add_client_ca(clcert)
return [cadesc, sedesc, cldesc]
self._check_client_ca_list(mixed_set_add_ca)
def test_set_after_add_client_ca(self):
"""
A call to L{Context.set_client_ca_list} after a call to
L{Context.add_client_ca} replaces the CA name specified by the former
call with the names specified by the latter cal.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
cadesc = cacert.get_subject()
sedesc = secert.get_subject()
cldesc = clcert.get_subject()
def set_replaces_add_ca(ctx):
ctx.add_client_ca(clcert)
ctx.set_client_ca_list([cadesc])
ctx.add_client_ca(secert)
return [cadesc, sedesc]
self._check_client_ca_list(set_replaces_add_ca)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def read_client_identity():
'''Loads the private key and certificate objects as read
from the client identity PEM file. Returns a pair of objects
(key,cert) or None if something bad happened.'''
common.print_info("Loading identity file...")
# Check for missing client identity:
if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH):
common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
return None
# Read and load PKI material from the client identity:
file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r')
file_contents = file_object.read()
file_object.close()
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
except crypto.Error:
common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
cert = None
try:
key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents)
except crypto.Error:
common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
key = None
# Return PKI materials:
return key,cert
def prepara_certificado_txt(self, cert_txt):
#
# Para dar certo a leitura pelo xmlsec, temos que separar o certificado
# em linhas de 64 caracteres de extensão...
#
cert_txt = cert_txt.replace(u'\n', u'')
cert_txt = cert_txt.replace(u'-----BEGIN CERTIFICATE-----', u'')
cert_txt = cert_txt.replace(u'-----END CERTIFICATE-----', u'')
linhas_certificado = [u'-----BEGIN CERTIFICATE-----\n']
for i in range(0, len(cert_txt), 64):
linhas_certificado.append(cert_txt[i:i+64] + '\n')
linhas_certificado.append(u'-----END CERTIFICATE-----\n')
self.certificado = u''.join(linhas_certificado)
cert_openssl = crypto.load_certificate(crypto.FILETYPE_PEM, self.certificado)
self.emissor = dict(cert_openssl.get_issuer().get_components())
self.proprietario = dict(cert_openssl.get_subject().get_components())
self.data_inicio_validade = datetime.strptime(cert_openssl.get_notBefore().decode('utf-8'), '%Y%m%d%H%M%SZ')
self.data_fim_validade = datetime.strptime(cert_openssl.get_notAfter().decode('utf-8'), '%Y%m%d%H%M%SZ')
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return Verifier(pubkey)
def _get_cert_from_file(self, filename):
with open(filename, 'r') as f:
cert_pem = f.read()
if not cert_pem:
raise nsxlib_exceptions.CertificateError(
msg=_("Failed to read certificate from %s") % filename)
# validate correct crypto
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg=_("Failed to import client certificate"))
return cert
def get_cert_and_key(self):
"""Load cert and key from storage"""
if self._cert and self._key:
return self._cert, self._key
cert_pem, key_pem = self._load_from_storage()
if cert_pem is None:
return None, None
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg="Failed to load client certificate")
return cert, key
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is
expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error if the key_pem can't be parsed.
"""
from OpenSSL import crypto
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None,
friendly_name=None):
"""
Generate a PKCS12 object with components from PEM. Verify that the set
functions return None.
"""
p12 = PKCS12()
if cert_pem:
ret = p12.set_certificate(load_certificate(FILETYPE_PEM, cert_pem))
assert ret is None
if key_pem:
ret = p12.set_privatekey(load_privatekey(FILETYPE_PEM, key_pem))
assert ret is None
if ca_pem:
ret = p12.set_ca_certificates(
(load_certificate(FILETYPE_PEM, ca_pem),)
)
assert ret is None
if friendly_name:
ret = p12.set_friendlyname(friendly_name)
assert ret is None
return p12
def test_replace(self):
"""
`PKCS12.set_certificate` replaces the certificate in a PKCS12
cluster. `PKCS12.set_privatekey` replaces the private key.
`PKCS12.set_ca_certificates` replaces the CA certificates.
"""
p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
p12.set_ca_certificates([root_cert]) # not a tuple
assert 1 == len(p12.get_ca_certificates())
assert root_cert == p12.get_ca_certificates()[0]
p12.set_ca_certificates([client_cert, root_cert])
assert 2 == len(p12.get_ca_certificates())
assert client_cert == p12.get_ca_certificates()[0]
assert root_cert == p12.get_ca_certificates()[1]
def test_sign_verify_ecdsa(self):
"""
`sign` generates a cryptographic signature which `verify` can check.
ECDSA Signatures in the X9.62 format may have variable length,
different from the length of the private key.
"""
content = (
b"It was a bright cold day in April, and the clocks were striking "
b"thirteen. Winston Smith, his chin nuzzled into his breast in an "
b"effort to escape the vile wind, slipped quickly through the "
b"glass doors of Victory Mansions, though not quickly enough to "
b"prevent a swirl of gritty dust from entering along with him."
).decode("ascii")
priv_key = load_privatekey(FILETYPE_PEM, ec_root_key_pem)
cert = load_certificate(FILETYPE_PEM, ec_root_cert_pem)
sig = sign(priv_key, content, "sha1")
verify(cert, sig, content, "sha1")
def test_shutdown_truncated(self):
"""
If the underlying connection is truncated, `Connection.shutdown`
raises an `Error`.
"""
server_ctx = Context(TLSv1_METHOD)
client_ctx = Context(TLSv1_METHOD)
server_ctx.use_privatekey(
load_privatekey(FILETYPE_PEM, server_key_pem))
server_ctx.use_certificate(
load_certificate(FILETYPE_PEM, server_cert_pem))
server = Connection(server_ctx, None)
client = Connection(client_ctx, None)
handshake_in_memory(client, server)
assert not server.shutdown()
with pytest.raises(WantReadError):
server.shutdown()
server.bio_shutdown()
with pytest.raises(Error):
server.shutdown()
def _server(self, sock):
"""
Create a new server-side SSL `Connection` object wrapped around `sock`.
"""
# Create the server side Connection. This is mostly setup boilerplate
# - use TLSv1, use a particular certificate, etc.
server_ctx = Context(TLSv1_METHOD)
server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE)
server_ctx.set_verify(
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT | VERIFY_CLIENT_ONCE,
verify_cb
)
server_store = server_ctx.get_cert_store()
server_ctx.use_privatekey(
load_privatekey(FILETYPE_PEM, server_key_pem))
server_ctx.use_certificate(
load_certificate(FILETYPE_PEM, server_cert_pem))
server_ctx.check_privatekey()
server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
# Here the Connection is actually created. If None is passed as the
# 2nd parameter, it indicates a memory BIO should be created.
server_conn = Connection(server_ctx, sock)
server_conn.set_accept_state()
return server_conn
def test_set_multiple_ca_list(self):
"""
If passed a list containing multiple X509Name objects,
`Context.set_client_ca_list` configures the context to send
those CA names to the client and, on both the server and client sides,
`Connection.get_client_ca_list` returns a list containing those
X509Names after the connection is set up.
"""
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
sedesc = secert.get_subject()
cldesc = clcert.get_subject()
def multiple_ca(ctx):
L = [sedesc, cldesc]
ctx.set_client_ca_list(L)
return L
self._check_client_ca_list(multiple_ca)
def test_reset_ca_list(self):
"""
If called multiple times, only the X509Names passed to the final call
of `Context.set_client_ca_list` are used to configure the CA
names sent to the client.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
cadesc = cacert.get_subject()
sedesc = secert.get_subject()
cldesc = clcert.get_subject()
def changed_ca(ctx):
ctx.set_client_ca_list([sedesc, cldesc])
ctx.set_client_ca_list([cadesc])
return [cadesc]
self._check_client_ca_list(changed_ca)
def test_mutated_ca_list(self):
"""
If the list passed to `Context.set_client_ca_list` is mutated
afterwards, this does not affect the list of CA names sent to the
client.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
cadesc = cacert.get_subject()
sedesc = secert.get_subject()
def mutated_ca(ctx):
L = [cadesc]
ctx.set_client_ca_list([cadesc])
L.append(sedesc)
return [cadesc]
self._check_client_ca_list(mutated_ca)