def ast(self):
"""Create Python AST of this predicate.
:return: AST representation of predicate
"""
# we could directly use db[task] in predicates, but predicates should not handle database errors,
# so leave them on higher level (selinon) and index database before predicate is being called
kwargs = []
# we want to avoid querying to database if possible, if a predicate does not require message, do not ask for it
if self.requires_message():
# this can raise an exception if check was not run, since we are accessing storage that can be None
kwargs.append(ast.keyword(arg='message',
value=ast.Call(func=ast.Attribute(value=ast.Name(id='db', ctx=ast.Load()),
attr='get', ctx=ast.Load()),
args=[ast.Str(s=self._task_str_name())],
keywords=[], starargs=None, kwargs=None)))
if self.requires_node_args():
kwargs.append(ast.keyword(arg='node_args', value=ast.Name(id='node_args', ctx=ast.Load())))
kwargs.extend([ast.keyword(arg=k, value=ast.Str(s=v)) for k, v in self._args.items()])
return ast.Call(func=ast.Name(id=self._func.__name__, ctx=ast.Load()),
args=[], starargs=None, kwargs=None, keywords=kwargs)
评论列表
文章目录