def load_certificate(self, cert_name):
# Load the raw certificate file data.
path = os.path.join(self.cert_path, cert_name)
with open(path, 'rb') as cert_file:
data = cert_file.read()
# Convert the raw certificate data into a certificate object, first
# as a PEM-encoded certificate and, if that fails, then as a
# DER-encoded certificate. If both fail, the certificate cannot be
# loaded.
try:
return x509.load_pem_x509_certificate(data, default_backend())
except Exception:
try:
return x509.load_der_x509_certificate(data, default_backend())
except Exception:
raise exception.SignatureVerificationError(
"Failed to load certificate: %s" % path
)
python类load_der_x509_certificate()的实例源码
def verify_peer_token(self, peer_token, peer, peer_type):
token_time = peer_token[:16]
token_sign = peer_token[16:]
current_date = int(datetime.datetime.now().timestamp() * 1000)
token_date = int(token_time, 16)
if current_date > token_date:
return False
date = bytes.fromhex(token_time)
peer_info = b''.join([peer.peer_id.encode('utf-8'),
peer.target.encode('utf-8'),
peer.group_id.encode('utf-8')]) + bytes([peer_type])
peer_cert = x509.load_der_x509_certificate(bytes.fromhex(peer.cert), default_backend())
peer_pub = peer_cert.public_key().public_bytes(encoding=serialization.Encoding.DER,
format=PublicFormat.SubjectPublicKeyInfo)
token_bytes = peer_info + date + peer_pub
logging.debug("TBS Token(V) : %s", token_bytes.hex())
signature = bytes.fromhex(token_sign)
return self.verify_data_with_cert(cert=self.__ca_cert, data=token_bytes, signature=signature)
def load_certificate(cert_file=None, cert_bytes=None):
if cert_file:
with open(cert_file, 'rb') as f:
data = f.read()
else:
data = cert_bytes
if '-----BEGIN'.encode() in data:
cert = x509.load_pem_x509_certificate(
data=data,
backend=default_backend()
)
else:
cert = x509.load_der_x509_certificate(
data=data,
backend=default_backend()
)
return cert
def get_certificate(self, filename):
"""
Return a certificate object by giving the name in the apk file
"""
pkcs7message = self.get_file(filename)
message, _ = decode(pkcs7message)
cert = encode(message[1][3])
# Remove the first identifier·
# byte 0 == identifier, skip
# byte 1 == length. If byte1 & 0x80 > 1, we have long format
# The length of to read bytes is then coded
# in byte1 & 0x7F
cert = cert[2 + (cert[1] & 0x7F) if cert[1] & 0x80 > 1 else 2:]
certificate = x509.load_der_x509_certificate(cert, default_backend())
return certificate
def get_certificate_chain(self, certificate):
cert = requests.get(certificate)
if cert.status_code != 200:
raise ClientError('Certificate fetch failed: {}'.format(
cert.json()['detail']))
if 'up' not in cert.links:
return
chain_url = urllib.parse.urljoin(certificate, cert.links['up']['url'])
chain = requests.get(chain_url)
if chain.status_code != 200:
raise ClientError('Certificate chain fetch failed: {}'.format(
cert.json()['detail']))
return x509.load_der_x509_certificate(chain.content, backend)
def load_der_certificate(data):
"""
Loads a DER X.509 certificate.
"""
return x509.load_der_x509_certificate(data, default_backend())
def _ValidatePubkeyGeneric(self, signing_cert, digest_alg, payload,
enc_digest):
cert = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
pubkey = cert.public_key()
if isinstance(pubkey, RSAPublicKey):
verifier = pubkey.verifier(enc_digest, padding.PKCS1v15(), cert.signature_hash_algorithm)
else:
verifier = pubkey.verifier(enc_digest, cert.signature_hash_algorithm)
verifier.update(payload)
try:
verifier.verify()
return True
except:
return False
def ValidateCertificateSignature(self, signed_cert, signing_cert):
"""Given a cert signed by another cert, validates the signature."""
# First the naive way -- note this does not check expiry / use etc.
signed = x509.load_der_x509_certificate(der_encoder.encode(signed_cert), default_backend())
signing = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
verifier = signing.public_key().verifier(signed.signature, padding.PKCS1v15(), signed.signature_hash_algorithm)
verifier.update(signed.tbs_certificate_bytes)
try:
verifier.verify()
except Exception as e:
raise Asn1Error('1: Validation of cert signature failed: {}'.format(e))
def convert_x509cert(self, cert_bytes):
"""???? ???? x509 ???? ??
:param cert_bytes: ??? bytes
:return: x509 ???
"""
return x509.load_der_x509_certificate(cert_bytes, default_backend())
def verify_data_with_dercert(self, cert_der, data: bytes, signature: bytes) -> bool:
"""
?? ? ??? ??
:param cert_der: ???(der bytes)
:param data: ?? ??
:param signature: ??
:return: ?? ??(True/False)
"""
cert = x509.load_der_x509_certificate(cert_der, default_backend())
return self.verify_data_with_cert(cert=cert, data=data, signature=signature)
def verify_certificate_der(self, der_cert):
"""
??? ??
:param der_cert: DER ??? ?????
:return: ?? ??
"""
cert = x509.load_der_x509_certificate(der_cert, default_backend())
return self.verify_certificate(cert)
def generate_peer_token(self, peer_sign, peer_cert, peer_id, peer_target,
group_id, peer_type, rand_key, token_interval):
peer_info = b''.join([peer_id.encode('utf-8'),
peer_target.encode('utf-8'),
group_id.encode('utf-8')]) + bytes([peer_type])
data = peer_info + rand_key
cert = x509.load_der_x509_certificate(peer_cert, default_backend())
if self.verify_data_with_cert(cert=cert, data=data, signature=peer_sign):
time = datetime.datetime.now() + datetime.timedelta(minutes=token_interval)
date = int(time.timestamp() * 1000).to_bytes(length=8, byteorder='big')
peer_pub = cert.public_key().public_bytes(encoding=serialization.Encoding.DER,
format=PublicFormat.SubjectPublicKeyInfo)
# token_bytes = peer_id || peer_target || group_id || peer_type || peer_pub
token_bytes = peer_info + date + peer_pub
logging.debug("TBS Token[%s]", token_bytes.hex())
# token = date || CA_Sign(token_bytes)
signed_token = self.sign_data(token_bytes)
token = b''.join([date, signed_token]).hex()
return token
else:
logging.debug('?? ?? ?? ?? ??? ?? ??')
return None
def _ValidatePubkeyGeneric(self, signing_cert, digest_alg, payload,
enc_digest):
cert = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
pubkey = cert.public_key()
if isinstance(pubkey, RSAPublicKey):
verifier = pubkey.verifier(enc_digest, padding.PKCS1v15(), cert.signature_hash_algorithm)
else:
verifier = pubkey.verifier(enc_digest, cert.signature_hash_algorithm)
verifier.update(payload)
try:
verifier.verify()
return True
except:
return False
def ValidateCertificateSignature(self, signed_cert, signing_cert):
"""Given a cert signed by another cert, validates the signature."""
# First the naive way -- note this does not check expiry / use etc.
signed = x509.load_der_x509_certificate(der_encoder.encode(signed_cert), default_backend())
signing = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
verifier = signing.public_key().verifier(signed.signature, padding.PKCS1v15(), signed.signature_hash_algorithm)
verifier.update(signed.tbs_certificate_bytes)
try:
verifier.verify()
except Exception as e:
raise Asn1Error('1: Validation of cert signature failed: {}'.format(e))
signature_utils.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_certificate(context, signature_certificate_uuid):
"""Create the certificate object from the retrieved certificate data.
:param context: the user context for authentication
:param signature_certificate_uuid: the uuid to use to retrieve the
certificate
:returns: the certificate cryptography object
:raises: SignatureVerificationError if the retrieval fails or the format
is invalid
"""
keymgr_api = key_manager.API()
try:
# The certificate retrieved here is a castellan certificate object
cert = keymgr_api.get(context, signature_certificate_uuid)
except KeyManagerError as e:
# The problem encountered may be backend-specific, since castellan
# can use different backends. Rather than importing all possible
# backends here, the generic "Exception" is used.
msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
% {'id': signature_certificate_uuid,
'e': encodeutils.exception_to_unicode(e)})
LOG.error(msg)
raise exception.SignatureVerificationError(
reason=_('Unable to retrieve certificate with ID: %s')
% signature_certificate_uuid)
if cert.format not in CERTIFICATE_FORMATS:
raise exception.SignatureVerificationError(
reason=_('Invalid certificate format: %s') % cert.format)
if cert.format == X_509:
# castellan always encodes certificates in DER format
cert_data = cert.get_encoded()
certificate = x509.load_der_x509_certificate(cert_data,
default_backend())
# verify the certificate
verify_certificate(certificate)
return certificate
def load_der_certificate(data):
"""
Loads a DER X.509 certificate.
"""
return x509.load_der_x509_certificate(data, default_backend())
def get_certificate(self, certificate):
cert = requests.get(certificate)
if cert.status_code != 200:
raise ClientError('Certificate fetch failed: {}'.format(
cert.json()['detail']))
return x509.load_der_x509_certificate(cert.content, backend)
def get_certificate(context, signature_certificate_uuid):
"""Create the certificate object from the retrieved certificate data.
:param context: the user context for authentication
:param signature_certificate_uuid: the uuid to use to retrieve the
certificate
:returns: the certificate cryptography object
:raises: SignatureVerificationError if the retrieval fails or the format
is invalid
"""
keymgr_api = key_manager.API()
try:
# The certificate retrieved here is a castellan certificate object
cert = keymgr_api.get(context, signature_certificate_uuid)
except ManagedObjectNotFoundError as e:
raise exception.SignatureVerificationError(
reason=_('Certificate not found with ID: %s')
% signature_certificate_uuid)
except KeyManagerError as e:
# The problem encountered may be backend-specific, since castellan
# can use different backends. Rather than importing all possible
# backends here, the generic "Exception" is used.
msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
% {'id': signature_certificate_uuid,
'e': encodeutils.exception_to_unicode(e)})
LOG.error(msg)
raise exception.SignatureVerificationError(
reason=_('Unable to retrieve certificate with ID: %s')
% signature_certificate_uuid)
if cert.format not in CERTIFICATE_FORMATS:
raise exception.SignatureVerificationError(
reason=_('Invalid certificate format: %s') % cert.format)
if cert.format == X_509:
# castellan always encodes certificates in DER format
cert_data = cert.get_encoded()
certificate = x509.load_der_x509_certificate(cert_data,
default_backend())
return certificate
def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True)
try:
certificate = x509.load_der_x509_certificate(
certificate_data,
backends.default_backend()
)
except Exception:
# This should never get raised "in theory," as the ssl socket
# should fail to connect non-TLS connections before the session
# gets created. This is a failsafe in case that protection fails.
raise exceptions.PermissionDenied(
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity."
)
try:
extended_key_usage = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
).value
except x509.ExtensionNotFound:
raise exceptions.PermissionDenied(
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
client_identities = certificate.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied(
"The client certificate does not define a subject common "
"name. Session client identity unavailable."
)
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable."
)
def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True)
try:
certificate = x509.load_der_x509_certificate(
certificate_data,
backends.default_backend()
)
except Exception:
# This should never get raised "in theory," as the ssl socket
# should fail to connect non-TLS connections before the session
# gets created. This is a failsafe in case that protection fails.
raise exceptions.PermissionDenied(
"Failure loading the client certificate from the session "
"connection. Could not retrieve client identity."
)
try:
extended_key_usage = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
).value
except x509.ExtensionNotFound:
raise exceptions.PermissionDenied(
"The extended key usage extension is missing from the client "
"certificate. Session client identity unavailable."
)
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage:
client_identities = certificate.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if len(client_identities) > 0:
if len(client_identities) > 1:
self._logger.warning(
"Multiple client identities found. Using the first "
"one processed."
)
client_identity = client_identities[0].value
self._logger.info(
"Session client identity: {0}".format(client_identity)
)
return client_identity
else:
raise exceptions.PermissionDenied(
"The client certificate does not define a subject common "
"name. Session client identity unavailable."
)
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable."
)
def fetch_details(self, crtsh_ids):
rows = self._engine.execute("""
SELECT
c.id, c.certificate, array_agg(DISTINCT cc.ca_owner)
FROM certificate c
INNER JOIN
ca_certificate cac ON c.issuer_ca_id = cac.ca_id
INNER JOIN
ccadb_certificate cc ON cac.certificate_id = cc.certificate_id
WHERE c.id IN %s
GROUP BY c.id, c.certificate
""", [(tuple(crtsh_ids),)]).fetchall()
details = []
for row in rows:
cert = x509.load_der_x509_certificate(
bytes(row[1]), default_backend()
)
subject_cn = cert.subject.get_attributes_for_oid(
x509.NameOID.COMMON_NAME
)
issuer_cn = cert.issuer.get_attributes_for_oid(
x509.NameOID.COMMON_NAME
)
try:
san = cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
except x509.ExtensionNotFound:
san_domains = None
else:
san_domains = san.value.get_values_for_type(x509.DNSName)
details.append(RawCertificateDetails(
crtsh_id=row[0],
common_name=", ".join(a.value for a in subject_cn) if subject_cn else None,
san_dns_names=san_domains,
ccadb_owners=[o for o in row[2] if o is not None],
issuer_common_name=", ".join(a.value for a in issuer_cn) if issuer_cn else None,
expiration_date=cert.not_valid_after,
))
return details
def _issue_cert(self, client, server_name):
"""
Issue a new cert for a particular name.
"""
log.info(
'Requesting a certificate for {server_name!r}.',
server_name=server_name)
key = self._generate_key()
objects = [
Key(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))]
def answer_and_poll(authzr):
def got_challenge(stop_responding):
return (
poll_until_valid(authzr, self._clock, client)
.addBoth(tap(lambda _: stop_responding())))
return (
answer_challenge(authzr, client, self._responders)
.addCallback(got_challenge))
def got_cert(certr):
objects.append(
Certificate(
x509.load_der_x509_certificate(
certr.body, default_backend())
.public_bytes(serialization.Encoding.PEM)))
return certr
def got_chain(chain):
for certr in chain:
got_cert(certr)
log.info(
'Received certificate for {server_name!r}.',
server_name=server_name)
return objects
return (
client.request_challenges(fqdn_identifier(server_name))
.addCallback(answer_and_poll)
.addCallback(lambda ign: client.request_issuance(
CertificateRequest(
csr=csr_for_names([server_name], key))))
.addCallback(got_cert)
.addCallback(client.fetch_chain)
.addCallback(got_chain)
.addCallback(partial(self.cert_store.store, server_name)))