def _extract_x509_certificates(x509_certificates):
keys = []
for kid, certificate in x509_certificates.iteritems():
try:
if certificate.startswith(jwk.PREFIX):
# The certificate is PEM-encoded
der = ssl.PEM_cert_to_DER_cert(certificate)
key = jwk.der2rsa(der)
else:
key = jwk.import_rsa_key(certificate)
except Exception as exception:
raise UnauthenticatedException(u"Cannot load X.509 certificate",
exception)
rsa_key = jwk.RSAKey().load_key(key)
rsa_key.kid = kid
keys.append(rsa_key)
return keys
python类PEM_cert_to_DER_cert()的实例源码
def _loadPublicKey(fname):
with open(fname, 'r') as f:
cert_bytes = f.read()
cert_bytes = ssl.PEM_cert_to_DER_cert(cert_bytes)
cert = Crypto.Util.asn1.DerSequence()
cert.decode(cert_bytes)
tbsCertificate = Crypto.Util.asn1.DerSequence()
tbsCertificate.decode(cert[0])
## shaky, but seems to work on v1 and v3 certs.
try:
subjectPublicKeyInfo = tbsCertificate[6]
serial = "%x"%tbsCertificate[1]
except IndexError:
serial = "%x"%tbsCertificate[0]
subjectPublicKeyInfo = tbsCertificate[5]
key = Crypto.PublicKey.RSA.importKey(subjectPublicKeyInfo)
return type('Certificate', (object,), {'serial':serial.encode(), 'key':key})
def get_pubkey(pem):
""" Extracts public key from x08 pem. """
der = ssl.PEM_cert_to_DER_cert(pem)
# Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
cert = DerSequence()
cert.decode(der)
tbsCertificate = DerSequence()
tbsCertificate.decode(cert[0])
subjectPublicKeyInfo = tbsCertificate[6]
return subjectPublicKeyInfo
def load_PEMfile(self, certificate_path):
"""Load a certificate from a file in PEM format
"""
self._init_data()
self._filepath = certificate_path
with open(self._filepath, "r") as inputFile:
PEMdata = inputFile.read()
# convert to binary (DER format)
self._data = ssl.PEM_cert_to_DER_cert(PEMdata)
def init():
if os.environ.get('XDG_CONFIG_HOME') is None or os.environ.get('XDG_CONFIG_HOME') == '':
XDG_CONFIG_HOME = os.path.join(os.path.expanduser('~'), '.config')
else:
XDG_CONFIG_HOME = os.environ.get('XDG_CONFIG_HOME')
CONF_DIR_PATH = os.path.join(XDG_CONFIG_HOME, 'an2linux')
CONF_FILE_PATH = os.path.join(CONF_DIR_PATH, 'config')
CERTIFICATE_PATH = os.path.join(CONF_DIR_PATH, 'certificate.pem')
RSA_PRIVATE_KEY_PATH = os.path.join(CONF_DIR_PATH, 'rsakey.pem')
AUTHORIZED_CERTS_PATH = os.path.join(CONF_DIR_PATH, 'authorized_certs')
DHPARAM_PATH = os.path.join(CONF_DIR_PATH, 'dhparam.pem')
TMP_DIR_PATH = os.path.join(tempfile.gettempdir(), 'an2linux')
if not os.path.exists(CONF_DIR_PATH):
os.makedirs(CONF_DIR_PATH)
if not os.path.exists(TMP_DIR_PATH):
os.makedirs(TMP_DIR_PATH)
if not os.path.isfile(CERTIFICATE_PATH) or not os.path.isfile(RSA_PRIVATE_KEY_PATH):
generate_server_private_key_and_certificate(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH)
else:
# test if valid private key / certificate
try:
ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2).load_cert_chain(CERTIFICATE_PATH,
RSA_PRIVATE_KEY_PATH)
ssl.PEM_cert_to_DER_cert(open(CERTIFICATE_PATH, 'r').read())
except (ssl.SSLError, ValueError) as e:
print_with_timestamp('Something went wrong trying to load your private key and certificate: {}'.format(e))
print_with_timestamp('Will generate new key overwriting old key and certificate')
generate_server_private_key_and_certificate(CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH)
return CONF_FILE_PATH, CERTIFICATE_PATH, RSA_PRIVATE_KEY_PATH, AUTHORIZED_CERTS_PATH, DHPARAM_PATH, TMP_DIR_PATH
def load_PEMfile(self, certificate_path):
"""Load a certificate from a file in PEM format
"""
self._init_data()
self._filepath = certificate_path
with open(self._filepath, "r") as inputFile:
PEMdata = inputFile.read()
# convert to binary (DER format)
self._data = ssl.PEM_cert_to_DER_cert(PEMdata)