ingest_message.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:ndr 作者: SecuredByTHEM 项目源码 文件源码
def verify_and_load_message(config, file, only_accept_cn=None):
        '''Checks the message's signature to make sure it was signed by someone in our CA chain'''
        try:
            # We need a temporary file to get the signer PEM
            msg_fd, signer_pem = tempfile.mkstemp()
            os.close(msg_fd)  # Don't need to write anything to it

            # CApath is required to force out any system CAs that might be in the global CNF file
            ossl_verify_cmd = ["openssl", "smime", "-verify",
                               "-in", file, "-CAfile", config.ssl_cafile,
                               "-CApath", "/dev/nonexistant-dir",
                               "-text", "-signer", signer_pem]

            ossl_verify_proc = subprocess.run(
                args=ossl_verify_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                check=False)

            if ossl_verify_proc.returncode != 0:
                config.logger.warn(
                    "rejecting %s: %s", file, str(ossl_verify_proc.stderr))
                return None

            config.logger.info("%s passed openssl S/MIME verify", file)

            config.logger.debug("checking %s", signer_pem)
            with open(signer_pem, 'rb') as x509_signer:
                # NOW we can use cryptography to read the x509 certificates
                cert = x509.load_pem_x509_certificate(
                    x509_signer.read(), default_backend())

                common_name = cert.subject.get_attributes_for_oid(
                    x509.NameOID.COMMON_NAME)[0].value

                config.logger.debug(
                    "signed message came from common name: %s", common_name)

            # Make sure the CN is something we're willing to accept
            if only_accept_cn is not None:
                if common_name != only_accept_cn:
                    config.logger.error("rejecting message due to CN %s != %s",
                                        common_name, only_accept_cn)

            # Read it in
            decoded_message = ossl_verify_proc.stdout
            message = IngestMessage(config)
            message.load_from_yaml(decoded_message)

            return message

        # and clean up after ourselves
        finally:
            os.remove(signer_pem)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号