def __new__(typ, value):
if isinstance(value, basestring):
path = process.config_file(value)
if path is None:
log.warn("Certificate file '%s' is not readable" % value)
return None
try:
f = open(path, 'rt')
except:
log.warn("Certificate file '%s' could not be open" % value)
return None
try:
try:
return crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
except crypto.Error, e:
log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
return None
finally:
f.close()
else:
raise TypeError, 'value should be a string'
python类Error()的实例源码
def sign_message(given_message,given_key):
'''Signs the (hash of the) given message with the given private key.
Returns the base64 encoded signature or or a blank string if something bad happened.'''
# Check for blank message:
if not given_message:
common.print_error("Cannot sign blank message.")
return None
# Sign the message by encrypting its hash with the private key:
try:
signature = crypto.sign(given_key,given_message,'sha512')
signature = base64.b64encode(signature)
except crypto.Error:
common.print_error("Error signing message!")
signature = ''
# Return signature:
return signature
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''):
self.host = host
self.port = port
try:
self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase)
except IOError:
raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path))
except crypto.Error:
raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path))
try:
self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read())
except IOError:
raise eStreamerCertError("Unable to locate cert file {}".format(cert_path))
except crypto.Error:
raise eStreamerCertError("Invalid certificate {}".format(cert_path))
self.verify = verify
self.ctx = None
self.sock = None
self._bytes = None
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the
public key that this object was constructed with.
"""
message = _to_bytes(message, encoding='utf-8')
signature = _to_bytes(signature, encoding='utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _to_bytes(key)
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def test_invalid_extension(self):
"""
L{X509Extension} raises something if it is passed a bad extension
name or value.
"""
self.assertRaises(
Error, X509Extension, 'thisIsMadeUp', False, 'hi')
self.assertRaises(
Error, X509Extension, 'basicConstraints', False, 'blah blah')
# Exercise a weird one (an extension which uses the r2i method). This
# exercises the codepath that requires a non-NULL ctx to be passed to
# X509V3_EXT_nconf. It can't work now because we provide no
# configuration database. It might be made to work in the future.
self.assertRaises(
Error, X509Extension, 'proxyCertInfo', True,
'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
def test_key_only(self):
"""
A L{PKCS12} with only a private key can be exported using
L{PKCS12.export} and loaded again using L{load_pkcs12}.
"""
passwd = 'blah'
p12 = PKCS12()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
p12.set_privatekey(pkey)
self.assertEqual(None, p12.get_certificate())
self.assertEqual(pkey, p12.get_privatekey())
try:
dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
except Error:
# Some versions of OpenSSL will throw an exception
# for this nearly useless PKCS12 we tried to generate:
# [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')]
return
p12 = load_pkcs12(dumped_p12, passwd)
self.assertEqual(None, p12.get_ca_certificates())
self.assertEqual(None, p12.get_certificate())
# OpenSSL fails to bring the key back to us. So sad. Perhaps in the
# future this will be improved.
self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
def test_load_without_mac(self):
"""
Loading a PKCS12 without a MAC does something other than crash.
"""
passwd = 'Lake Michigan'
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
try:
recovered_p12 = load_pkcs12(dumped_p12, passwd)
# The person who generated this PCKS12 should be flogged,
# or better yet we should have a means to determine
# whether a PCKS12 had a MAC that was verified.
# Anyway, libopenssl chooses to allow it, so the
# pyopenssl binding does as well.
self.assertTrue(isinstance(recovered_p12, PKCS12))
except Error:
# Failing here with an exception is preferred as some openssl
# versions do.
pass
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the
public key that this object was constructed with.
"""
message = _helpers._to_bytes(message, encoding='utf-8')
signature = _helpers._to_bytes(signature, encoding='utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _helpers._to_bytes(key)
parsed_pem_key = _helpers._parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _helpers._to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def read_client_identity():
'''Loads the private key and certificate objects as read
from the client identity PEM file. Returns a pair of objects
(key,cert) or None if something bad happened.'''
common.print_info("Loading identity file...")
# Check for missing client identity:
if not os.path.exists(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH):
common.print_error("No client identity file found at %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
return None
# Read and load PKI material from the client identity:
file_object = open(config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH,'r')
file_contents = file_object.read()
file_object.close()
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM,file_contents)
except crypto.Error:
common.print_error("Could not read the certificate from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
cert = None
try:
key = crypto.load_privatekey(crypto.FILETYPE_PEM,file_contents)
except crypto.Error:
common.print_error("Could not read the private key from %s." % config_paths.CLIENT_IDENTITY_INSTALLED_FILE_PATH)
key = None
# Return PKI materials:
return key,cert
def _get_cert_from_file(self, filename):
with open(filename, 'r') as f:
cert_pem = f.read()
if not cert_pem:
raise nsxlib_exceptions.CertificateError(
msg=_("Failed to read certificate from %s") % filename)
# validate correct crypto
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg=_("Failed to import client certificate"))
return cert
def get_cert_and_key(self):
"""Load cert and key from storage"""
if self._cert and self._key:
return self._cert, self._key
cert_pem, key_pem = self._load_from_storage()
if cert_pem is None:
return None, None
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
except crypto.Error:
raise nsxlib_exceptions.CertificateError(
msg="Failed to load client certificate")
return cert, key
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the
public key that this object was constructed with.
"""
message = _helpers._to_bytes(message, encoding='utf-8')
signature = _helpers._to_bytes(signature, encoding='utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _helpers._to_bytes(key)
parsed_pem_key = _helpers._parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _helpers._to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def logs(self, log_lines=str(settings.LOG_LINES)):
"""Return aggregated log data for this application."""
try:
url = "http://{}:{}/{}?log_lines={}".format(settings.LOGGER_HOST, settings.LOGGER_PORT,
self.id, log_lines)
r = requests.get(url)
# Handle HTTP request errors
except requests.exceptions.RequestException as e:
logger.error("Error accessing deis-logger using url '{}': {}".format(url, e))
raise e
# Handle logs empty or not found
if r.status_code == 204 or r.status_code == 404:
logger.info("GET {} returned a {} status code".format(url, r.status_code))
raise EnvironmentError('Could not locate logs')
# Handle unanticipated status codes
if r.status_code != 200:
logger.error("Error accessing deis-logger: GET {} returned a {} status code"
.format(url, r.status_code))
raise EnvironmentError('Error accessing deis-logger')
return r.content
def test_sign(self):
"""
`X509Req.sign` succeeds when passed a private key object and a
valid digest function. `X509Req.verify` can be used to check
the signature.
"""
request = self.signable()
key = PKey()
key.generate_key(TYPE_RSA, 512)
request.set_pubkey(key)
request.sign(key, GOOD_DIGEST)
# If the type has a verify method, cover that too.
if getattr(request, 'verify', None) is not None:
pub = request.get_pubkey()
assert request.verify(pub)
# Make another key that won't verify.
key = PKey()
key.generate_key(TYPE_RSA, 512)
with pytest.raises(Error):
request.verify(key)
def test_load_without_mac(self):
"""
Loading a PKCS12 without a MAC does something other than crash.
"""
passwd = b"Lake Michigan"
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
try:
recovered_p12 = load_pkcs12(dumped_p12, passwd)
# The person who generated this PCKS12 should be flogged,
# or better yet we should have a means to determine
# whether a PCKS12 had a MAC that was verified.
# Anyway, libopenssl chooses to allow it, so the
# pyopenssl binding does as well.
assert isinstance(recovered_p12, PKCS12)
except Error:
# Failing here with an exception is preferred as some openssl
# versions do.
pass
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the
public key that this object was constructed with.
"""
message = _helpers._to_bytes(message, encoding='utf-8')
signature = _helpers._to_bytes(signature, encoding='utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _helpers._to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _helpers._to_bytes(key)
parsed_pem_key = _helpers._parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _helpers._to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)
def test_invalid_extension(self):
"""
L{X509Extension} raises something if it is passed a bad extension
name or value.
"""
self.assertRaises(
Error, X509Extension, 'thisIsMadeUp', False, 'hi')
self.assertRaises(
Error, X509Extension, 'basicConstraints', False, 'blah blah')
# Exercise a weird one (an extension which uses the r2i method). This
# exercises the codepath that requires a non-NULL ctx to be passed to
# X509V3_EXT_nconf. It can't work now because we provide no
# configuration database. It might be made to work in the future.
self.assertRaises(
Error, X509Extension, 'proxyCertInfo', True,
'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
def test_key_only(self):
"""
A L{PKCS12} with only a private key can be exported using
L{PKCS12.export} and loaded again using L{load_pkcs12}.
"""
passwd = 'blah'
p12 = PKCS12()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
p12.set_privatekey(pkey)
self.assertEqual(None, p12.get_certificate())
self.assertEqual(pkey, p12.get_privatekey())
try:
dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
except Error:
# Some versions of OpenSSL will throw an exception
# for this nearly useless PKCS12 we tried to generate:
# [('PKCS12 routines', 'PKCS12_create', 'invalid null argument')]
return
p12 = load_pkcs12(dumped_p12, passwd)
self.assertEqual(None, p12.get_ca_certificates())
self.assertEqual(None, p12.get_certificate())
# OpenSSL fails to bring the key back to us. So sad. Perhaps in the
# future this will be improved.
self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
def test_load_without_mac(self):
"""
Loading a PKCS12 without a MAC does something other than crash.
"""
passwd = 'Lake Michigan'
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
try:
recovered_p12 = load_pkcs12(dumped_p12, passwd)
# The person who generated this PCKS12 should be flogged,
# or better yet we should have a means to determine
# whether a PCKS12 had a MAC that was verified.
# Anyway, libopenssl chooses to allow it, so the
# pyopenssl binding does as well.
self.assertTrue(isinstance(recovered_p12, PKCS12))
except Error:
# Failing here with an exception is preferred as some openssl
# versions do.
pass
def verify(self, message, signature):
"""Verifies a message against a signature.
Args:
message: string or bytes, The message to verify. If string, will be
encoded to bytes as utf-8.
signature: string or bytes, The signature on the message. If string,
will be encoded to bytes as utf-8.
Returns:
True if message was signed by the private key associated with the
public key that this object was constructed with.
"""
message = _to_bytes(message, encoding='utf-8')
signature = _to_bytes(signature, encoding='utf-8')
try:
crypto.verify(self._pubkey, signature, message, 'sha256')
return True
except crypto.Error:
return False
def from_string(key_pem, is_x509_cert):
"""Construct a Verified instance from a string.
Args:
key_pem: string, public key in PEM format.
is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it
is expected to be an RSA key in PEM format.
Returns:
Verifier instance.
Raises:
OpenSSL.crypto.Error: if the key_pem can't be parsed.
"""
key_pem = _to_bytes(key_pem)
if is_x509_cert:
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, key_pem)
else:
pubkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
return OpenSSLVerifier(pubkey)
def from_string(key, password=b'notasecret'):
"""Construct a Signer instance from a string.
Args:
key: string, private key in PKCS12 or PEM format.
password: string, password for the private key file.
Returns:
Signer instance.
Raises:
OpenSSL.crypto.Error if the key can't be parsed.
"""
key = _to_bytes(key)
parsed_pem_key = _parse_pem_key(key)
if parsed_pem_key:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, parsed_pem_key)
else:
password = _to_bytes(password, encoding='utf-8')
pkey = crypto.load_pkcs12(key, password).get_privatekey()
return OpenSSLSigner(pkey)