exec.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:munch-core 作者: crunchmail 项目源码 文件源码
def apply(self, envelope):
        for path in settings.TRANSACTIONAL.get('EXEC_QUEUE_POLICIES'):

            if os.path.exists(path):
                with open(path) as module:

                    ephemeral_context = {}

                    allowed_context = {'__builtins__': {
                        'settings': settings,
                        'cache': cache,
                        'logger': logger,
                        'print': print
                    }}
                    for mod in settings.TRANSACTIONAL.get('EXEC_QUEUE_POLICIES_CONTEXT_BUILTINS'):
                        allowed_context['__builtins__'][mod] = __import__(mod)

                    try:
                        exec(module.read(), allowed_context, ephemeral_context)
                        ephemeral_context.get('apply')(envelope)
                    except Exception as err:
                        logger.warning(
                            '[{}] Failed to execute "{}" ephemeral '
                            'policy: {}'.format(
                                envelope.headers.get(
                                    settings.TRANSACTIONAL.get(
                                        'X_MESSAGE_ID_HEADER',
                                        'NO-MESSAGE-ID')),
                                path, err))

            else:
                logger.warning(
                    "[{}] Following ephemeral policy doesn't "
                    "exists: {}".format(
                        envelope.headers.get(settings.TRANSACTIONAL.get(
                            'X_MESSAGE_ID_HEADER', 'NO-MESSAGE-ID')),
                        path))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号