def fetch_details(self, crtsh_ids):
rows = self._engine.execute("""
SELECT
c.id, c.certificate, array_agg(DISTINCT cc.ca_owner)
FROM certificate c
INNER JOIN
ca_certificate cac ON c.issuer_ca_id = cac.ca_id
INNER JOIN
ccadb_certificate cc ON cac.certificate_id = cc.certificate_id
WHERE c.id IN %s
GROUP BY c.id, c.certificate
""", [(tuple(crtsh_ids),)]).fetchall()
details = []
for row in rows:
cert = x509.load_der_x509_certificate(
bytes(row[1]), default_backend()
)
subject_cn = cert.subject.get_attributes_for_oid(
x509.NameOID.COMMON_NAME
)
issuer_cn = cert.issuer.get_attributes_for_oid(
x509.NameOID.COMMON_NAME
)
try:
san = cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
except x509.ExtensionNotFound:
san_domains = None
else:
san_domains = san.value.get_values_for_type(x509.DNSName)
details.append(RawCertificateDetails(
crtsh_id=row[0],
common_name=", ".join(a.value for a in subject_cn) if subject_cn else None,
san_dns_names=san_domains,
ccadb_owners=[o for o in row[2] if o is not None],
issuer_common_name=", ".join(a.value for a in issuer_cn) if issuer_cn else None,
expiration_date=cert.not_valid_after,
))
return details
评论列表
文章目录