def get_pkcs7_certificates(bundle):
from OpenSSL._util import (
ffi as _ffi,
lib as _lib
)
from OpenSSL.crypto import X509
certs = _ffi.NULL
if bundle.type_is_signed():
certs = bundle._pkcs7.d.sign.cert
elif bundle.type_is_signedAndEnveloped():
certs = bundle._pkcs7.d.signed_and_enveloped.cert
pycerts = []
for i in range(_lib.sk_X509_num(certs)):
pycert = X509.__new__(X509)
pycert._x509 = _ffi.gc(_lib.X509_dup(_lib.sk_X509_value(certs, i)), _lib.X509_free)
pycerts.append(pycert)
if not pycerts:
return tuple()
return tuple(pycerts)
python类crypto()的实例源码
def display_item_info(item, filename, print_filename=False):
if print_filename is True:
print("#######[ {} ]#######".format(filename))
if (isinstance(item, rsa.RSAPrivateKey) or
isinstance(item, dsa.DSAPrivateKey) or
isinstance(item, ec.EllipticCurvePrivateKey)):
display_private_key(item)
elif (isinstance(item, rsa.RSAPublicKey) or
isinstance(item, dsa.DSAPublicKey) or
isinstance(item, ec.EllipticCurvePublicKey)):
display_public_key(item)
elif isinstance(item, OpenSSL.crypto.PKCS12):
display_pkcs12(item)
elif isinstance(item, OpenSSL.crypto.PKCS7):
display_pkcs7(item)
elif isinstance(item, Certificate):
display_x509_cert(item)
def test_is_certificate_valid_false_less_than_24h(vault_ca_obj, monkeypatch):
class MockX509(OpenSSL.crypto.X509):
def get_notAfter(self):
return b'20000510141643Z'
class MockDatetime(datetime.datetime):
@classmethod
def now(cls):
return datetime.datetime(2000, 5, 9, 15, 16, 43, 100000)
monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
monkeypatch.setattr(vault_ca, 'datetime', MockDatetime)
assert not vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def test_is_certificate_valid_true_more_than_24h(vault_ca_obj, monkeypatch):
class MockX509(OpenSSL.crypto.X509):
def get_notAfter(self):
return b'20000510141643Z'
class MockDatetime(datetime.datetime):
@classmethod
def now(cls):
return datetime.datetime(2000, 5, 9, 13, 16, 43, 100000)
monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
monkeypatch.setattr(vault_ca, 'datetime', MockDatetime)
assert vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def sign_cert_request(ca_cert, ca_key, cert_request, serial):
req = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM,
cert_request)
cert = OpenSSL.crypto.X509()
cert.set_subject(req.get_subject())
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10*365*24*60*60)
cert.set_issuer(ca_cert.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(ca_key, "sha256")
cert_file = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert)
return cert_file
def openssl_to_cryptography_cert(cert):
backend = cryptography.hazmat.backends.default_backend()
buff = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
return load_der_x509_certificate(buff, backend)
# This function was modified from this PR to the pyopenssl project
# https://github.com/pyca/pyopenssl/pull/367/files
def load_binary_item(buffer):
backend = cryptography.hazmat.backends.default_backend()
# PKCS#1/PKCS#8 private in DER
try:
return cryptography.hazmat.primitives.serialization.load_der_private_key(buffer, password=None, backend=backend)
except:
pass
# PKCS#1/PKCS#8 public in DER
try:
return cryptography.hazmat.primitives.serialization.load_der_public_key(buffer, backend)
except:
pass
# X509 certificate in DER
try:
return load_der_x509_certificate(buffer, backend)
except:
pass
# PKCS7 certificate bundle in DER
try:
return OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_ASN1, buffer)
except:
pass
# PKCS12 bundle (binary)
try:
return OpenSSL.crypto.load_pkcs12(buffer)
except:
pass
raise Exception("Unknown file type for: {}".format(buffer))
def load_pem_file_item(buffer):
backend = cryptography.hazmat.backends.default_backend()
# PKCS#1/PKCS#8 private in PEM
try:
return cryptography.hazmat.primitives.serialization.load_pem_private_key(buffer, password=None, backend=backend)
except:
pass
# PKCS#1/PKCS#8 public in PEM
try:
return cryptography.hazmat.primitives.serialization.load_pem_public_key(buffer, backend)
except:
pass
# X509 certificate in PEM
try:
return load_pem_x509_certificate(buffer, backend)
except:
pass
# PKCS7 certificate bundle in PEM
try:
return OpenSSL.crypto.load_pkcs7_data(OpenSSL.crypto.FILETYPE_PEM, buffer)
except:
pass
raise Exception("Unknown file type for: {}".format(buffer))
def test_is_certificate_valid_true(vault_ca_obj, monkeypatch):
class MockX509(OpenSSL.crypto.X509):
def get_notAfter(self):
return b'20220510141643Z'
monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
assert vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def test_is_certificate_valid_false(vault_ca_obj, monkeypatch):
class MockX509(OpenSSL.crypto.X509):
def get_notAfter(self):
return b'20000510141643Z'
monkeypatch.setattr(OpenSSL.crypto, 'X509', MockX509)
assert not vault_ca_obj._is_certificate_valid('tests/fixtures/acomponent-test.test.org.pem.ok')
def gen_cert_request(cname):
pkey = OpenSSL.crypto.PKey()
pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
req = OpenSSL.crypto.X509Req()
req.get_subject().CN = cname
req.set_pubkey(pkey)
req.sign(pkey, 'sha512')
req_file = OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_PEM, req)
key_file = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, pkey)
return req_file, key_file
def test_pkcs12_format(self):
"""
PKCS12 ???? ???/??? ??? ?? ??
"""
logging.debug("----- PKCS12 Test Start -----")
# ECC ? ? ??
pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
pub_key = pri_key.public_key()
logging.debug("Key_Type : %s", type(pri_key))
# ??? ??
cert = self._generate_cert(pub_key=pub_key, issuer_key=pri_key, subject_name="test")
cert_pem = cert.public_bytes(
encoding=serialization.Encoding.DER
)
key_pem = pri_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# ???/???? OpenSSL Key? ??
crypto = OpenSSL.crypto
cert_ssl_key = crypto.load_certificate(
type=crypto.FILETYPE_ASN1,
buffer=cert_pem
)
priv_ssl_key = crypto.load_privatekey(
type=crypto.FILETYPE_ASN1,
buffer=key_pem,
passphrase=None
)
logging.debug("Key_Type : %s", type(priv_ssl_key))
# ??? ??????? PKCS12???? ??
p12 = OpenSSL.crypto.PKCS12()
p12.set_privatekey(priv_ssl_key)
p12.set_certificate(cert_ssl_key)
pfx = p12.export()
pfx_b64 = base64.b64encode(pfx, altchars=None)
logging.debug("%s", pfx_b64)
def cert_collect_pkcs12(request,token):
#when supplied with token, looks up request, checks approval status and returns appropriate HttpResponse
#in this case token is taken from browser
certs = Cert_request.objects.filter(token=token)
try:
cert_request=certs[0]
except:
return HttpResponse(status=404,content="Not found")
logger.info("retrieved cert %s",cert_request.common_name)
if (cert_request.issued==True):
logger.info("Certificate already issued %s",cert_request.common_name)
response= HttpResponse(status=410,content="Cert already issued")
elif (cert_request.approved==False):
logger.debug("Certificate pending approval %s",cert_request.common_name)
response=HttpResponse(status=403, content="Pending approval")
else:
logger.info("Generating cert %s",cert_request.common_name)
datastream=generateNewX509(cert_request)
c=OpenSSL.crypto
cert=c.load_certificate(c.FILETYPE_PEM, datastream)
key=c.load_privatekey(c.FILETYPE_PEM, datastream)
p12 = OpenSSL.crypto.PKCS12()
p12.set_privatekey(key)
p12.set_certificate(cert)
#open( "container.pfx", 'w' ).write( p12.export() )
p12dataStream=p12.export()
response = HttpResponse(p12dataStream, content_type='application/x-pkcs12',status=201)
response['Content-Disposition'] = 'attachment; filename="client_cert.p12"'
#update cert_request to avoid duplicates being created
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!uncomment below!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#cert_request.issued=True
cert_request.save()
return response