def verify_and_load_message(config, file, only_accept_cn=None):
'''Checks the message's signature to make sure it was signed by someone in our CA chain'''
try:
# We need a temporary file to get the signer PEM
msg_fd, signer_pem = tempfile.mkstemp()
os.close(msg_fd) # Don't need to write anything to it
# CApath is required to force out any system CAs that might be in the global CNF file
ossl_verify_cmd = ["openssl", "smime", "-verify",
"-in", file, "-CAfile", config.ssl_cafile,
"-CApath", "/dev/nonexistant-dir",
"-text", "-signer", signer_pem]
ossl_verify_proc = subprocess.run(
args=ossl_verify_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=False)
if ossl_verify_proc.returncode != 0:
config.logger.warn(
"rejecting %s: %s", file, str(ossl_verify_proc.stderr))
return None
config.logger.info("%s passed openssl S/MIME verify", file)
config.logger.debug("checking %s", signer_pem)
with open(signer_pem, 'rb') as x509_signer:
# NOW we can use cryptography to read the x509 certificates
cert = x509.load_pem_x509_certificate(
x509_signer.read(), default_backend())
common_name = cert.subject.get_attributes_for_oid(
x509.NameOID.COMMON_NAME)[0].value
config.logger.debug(
"signed message came from common name: %s", common_name)
# Make sure the CN is something we're willing to accept
if only_accept_cn is not None:
if common_name != only_accept_cn:
config.logger.error("rejecting message due to CN %s != %s",
common_name, only_accept_cn)
# Read it in
decoded_message = ossl_verify_proc.stdout
message = IngestMessage(config)
message.load_from_yaml(decoded_message)
return message
# and clean up after ourselves
finally:
os.remove(signer_pem)
评论列表
文章目录