def filterstr_to_filterfunc(filter_str: str, logged_in: bool) -> Callable[['Post'], bool]:
"""Takes an --only-if=... filter specification and makes a filter_func Callable out of it."""
# The filter_str is parsed, then all names occurring in its AST are replaced by loads to post.<name>. A
# function Post->bool is returned which evaluates the filter with the post as 'post' in its namespace.
class TransformFilterAst(ast.NodeTransformer):
def visit_Name(self, node: ast.Name):
# pylint:disable=invalid-name,no-self-use
if not isinstance(node.ctx, ast.Load):
raise InvalidArgumentException("Invalid filter: Modifying variables ({}) not allowed.".format(node.id))
if not hasattr(Post, node.id):
raise InvalidArgumentException("Invalid filter: Name {} is not defined.".format(node.id))
if node.id in Post.LOGIN_REQUIRING_PROPERTIES and not logged_in:
raise InvalidArgumentException("Invalid filter: Name {} requires being logged in.".format(node.id))
new_node = ast.Attribute(ast.copy_location(ast.Name('post', ast.Load()), node), node.id,
ast.copy_location(ast.Load(), node))
return ast.copy_location(new_node, node)
input_filename = '<--only-if parameter>'
compiled_filter = compile(TransformFilterAst().visit(ast.parse(filter_str, filename=input_filename, mode='eval')),
filename=input_filename, mode='eval')
def filterfunc(post: 'Post') -> bool:
# pylint:disable=eval-used
return bool(eval(compiled_filter, {'post': post}))
return filterfunc
python类NodeTransformer()的实例源码
def subs(root, **kwargs):
'''Substitute ast.Name nodes for numbers in root using the mapping in
kwargs. Returns a new copy of root.
'''
root = copy.deepcopy(root)
class Transformer(ast.NodeTransformer):
def visit_FunctionDef(self, node):
return node
def visit_Name(self, node):
if node.id in kwargs and not isinstance(node.ctx, ast.Store):
replacement = kwargs[node.id]
if isinstance(replacement, int):
return ast.copy_location(ast.Num(n=replacement), node)
else:
return copy.copy(replacement)
else:
return node
return Transformer().visit(root)
def sub_subscript(root, subs):
root = copy.deepcopy(root)
class Transformer(ast.NodeTransformer):
def visit_FunctionDef(self, node):
return node
def visit_Subscript(self, node):
self.generic_visit(node)
try:
node_tup = subscript_to_tuple(node)
if node_tup in subs:
return subs[node_tup]
else:
return node
except ValueError:
return node
return Transformer().visit(root)
def generic_visit(self, node):
super(NodeTransformer, self).generic_visit(node)
if hasattr(node, 'body') and type(node.body) is list:
returns = [i for i, child in enumerate(node.body) if type(child) is ast.Return]
if len(returns) > 0:
for wait in self.get_waits():
node.body.insert(returns[0], wait)
inserts = []
for i, child in enumerate(node.body):
if type(child) is ast.Expr and self.is_concurrent_call(child.value):
self.encounter_call(child.value)
elif self.is_valid_assignment(child):
call = child.value
self.encounter_call(call)
name = child.targets[0].value
self.arguments.add(SchedulerRewriter.top_level_name(name))
index = child.targets[0].slice.value
call.func = ast.Attribute(call.func, 'assign', ast.Load())
call.args = [ast.Tuple([name, index], ast.Load())] + call.args
node.body[i] = ast.Expr(call)
elif self.references_arg(child):
inserts.insert(0, i)
for index in inserts:
for wait in self.get_waits():
node.body.insert(index, wait)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def resolve_negative_literals(_ast):
class RewriteUnaryOp(ast.NodeTransformer):
def visit_UnaryOp(self, node):
if isinstance(node.op, ast.USub) and isinstance(node.operand, ast.Num):
node.operand.n = 0 - node.operand.n
return node.operand
else:
return node
return RewriteUnaryOp().visit(_ast)
# Make a getter for a variable. This function gives an output that
# contains lists of 4-tuples:
# (i) the tail of the function name for the getter
# (ii) the code for the arguments that the function takes
# (iii) the code for the return
# (iv) the output type
#
# Here is an example:
#
# Input: my_variable: {foo: num, bar: decimal[5]}
#
# Output:
#
# [('__foo', '', '.foo', 'num'),
# ('__bar', 'arg0: num, ', '.bar[arg0]', 'decimal')]
#
# The getters will have code:
# def get_my_variable__foo() -> num: return self.foo
# def get_my_variable__bar(arg0: nun) -> decimal: return self.bar[arg0]
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def apply_ast_transform(func, ast_transformer, *,
keep_original=True, globals_dict=None, debug=0):
"""
Apply the AST transform class to a function
Args:
keep_original: True to retain the old function in attribute `.f_original`
globals_dict: pass any external function in your NodeTransformer into this
"""
if (inspect.isclass(ast_transformer)
and issubclass(ast_transformer, DecoratorAST)):
ast_transformer = ast_transformer()
else:
assert isinstance(ast_transformer, DecoratorAST)
old_ast = get_func_ast(func)
# _, starting_line = inspect.getsourcelines(func)
if debug:
print("======= OLD AST =======")
ast_print(old_ast)
visitor = ast_transformer
new_ast = visitor.visit(old_ast)
if debug:
print("======= NEW AST =======")
ast_print(new_ast)
ast.fix_missing_locations(new_ast)
co = compile(new_ast, '<ast_demo>', 'exec')
fake_locals = {}
# exec will define the new function into fake_locals scope
# this is to avoid conflict with vars in the real locals()
# https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
exec(co, globals_dict, fake_locals)
new_f = fake_locals[func.__name__]
new_f.f_original = func if keep_original else None
return new_f
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)
def visit(self, node):
"""Ensure statement only contains allowed nodes."""
if not isinstance(node, self.ALLOWED):
raise SyntaxError('Not allowed in environment markers.\n%s\n%s' %
(self.statement,
(' ' * node.col_offset) + '^'))
return ast.NodeTransformer.visit(self, node)