def visit_Num(self, node):
"""
WARNING: you cannot directly write constants in list. It will throw obscure
error like "TypeError: required field "lineno" missing from expr"
You MUST wrap all constants in ast_demo types, like ast_demo.Num(n=42) instead of raw 42
"""
n = node.n
if isinstance(n, int):
new_node = ast.Call(func=ast.Name(id='Fraction', ctx=ast.Load()),
args=[node, ast.Num(n=1)], keywords=[])
ast.copy_location(new_node, node)
# ast_demo.fix_missing_locations(new_node)
return new_node
return node
python类fix_missing_locations()的实例源码
def visit_Assert(self, node):
if isinstance(node.test, ast.Compare) and \
len(node.test.ops) == 1 and \
isinstance(node.test.ops[0], ast.Eq):
call = ast.Call(func=ast.Name(id='assert_equal', ctx=ast.Load()),
args=[node.test.left, node.test.comparators[0]],
keywords=[])
# Wrap the call in an Expr node, because the return value isn't used.
newnode = ast.Expr(value=call)
ast.copy_location(newnode, node)
ast.fix_missing_locations(newnode)
return newnode
# Return the original node if we don't want to change it.
return node
def apply_ast_transform(func, ast_transformer, *,
keep_original=True, globals_dict=None, debug=0):
"""
Apply the AST transform class to a function
Args:
keep_original: True to retain the old function in attribute `.f_original`
globals_dict: pass any external function in your NodeTransformer into this
"""
if (inspect.isclass(ast_transformer)
and issubclass(ast_transformer, DecoratorAST)):
ast_transformer = ast_transformer()
else:
assert isinstance(ast_transformer, DecoratorAST)
old_ast = get_func_ast(func)
# _, starting_line = inspect.getsourcelines(func)
if debug:
print("======= OLD AST =======")
ast_print(old_ast)
visitor = ast_transformer
new_ast = visitor.visit(old_ast)
if debug:
print("======= NEW AST =======")
ast_print(new_ast)
ast.fix_missing_locations(new_ast)
co = compile(new_ast, '<ast_demo>', 'exec')
fake_locals = {}
# exec will define the new function into fake_locals scope
# this is to avoid conflict with vars in the real locals()
# https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
exec(co, globals_dict, fake_locals)
new_f = fake_locals[func.__name__]
new_f.f_original = func if keep_original else None
return new_f
def test_invalid_identitifer(self):
m = ast.Module([ast.Expr(ast.Name(42, ast.Load()))])
ast.fix_missing_locations(m)
with self.assertRaises(TypeError) as cm:
compile(m, "<test>", "exec")
self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self):
m = ast.Module([ast.Expr(ast.Str(42))])
ast.fix_missing_locations(m)
with self.assertRaises(TypeError) as cm:
compile(m, "<test>", "exec")
self.assertIn("string must be of type str", str(cm.exception))
def test_fix_missing_locations(self):
src = ast.parse('write("spam")')
src.body.append(ast.Expr(ast.Call(ast.Name('spam', ast.Load()),
[ast.Str('eggs')], [], None, None)))
self.assertEqual(src, ast.fix_missing_locations(src))
self.assertEqual(ast.dump(src, include_attributes=True),
"Module(body=[Expr(value=Call(func=Name(id='write', ctx=Load(), "
"lineno=1, col_offset=0), args=[Str(s='spam', lineno=1, "
"col_offset=6)], keywords=[], starargs=None, kwargs=None, "
"lineno=1, col_offset=0), lineno=1, col_offset=0), "
"Expr(value=Call(func=Name(id='spam', ctx=Load(), lineno=1, "
"col_offset=0), args=[Str(s='eggs', lineno=1, col_offset=0)], "
"keywords=[], starargs=None, kwargs=None, lineno=1, "
"col_offset=0), lineno=1, col_offset=0)])"
)
def mod(self, mod, msg=None, mode="exec", *, exc=ValueError):
mod.lineno = mod.col_offset = 0
ast.fix_missing_locations(mod)
with self.assertRaises(exc) as cm:
compile(mod, "<test>", mode)
if msg is not None:
self.assertIn(msg, str(cm.exception))
expr.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def visit(self, node, **kwargs):
if isinstance(node, string_types):
clean = self.preparser(node)
node = ast.fix_missing_locations(ast.parse(clean))
method = 'visit_' + node.__class__.__name__
visitor = getattr(self, method)
return visitor(node, **kwargs)
def add_input_indices(root, input_vars, index_var):
class AddInputIndicesVisitor(ast.NodeTransformer):
def visit_Subscript(self, node):
if get_var_name(node) in input_vars:
return extend_subscript_for_input(node, index_var)
return node
def visit_Name(self, node):
if node.id in input_vars:
return ast.Subscript(node, ast.Index(index_var), node.ctx)
return node
vis = AddInputIndicesVisitor()
root = vis.visit(root)
return ast.fix_missing_locations(root)
def __call__(self, *args, **kwargs):
if self.f is None:
source = inspect.getsourcelines(self.orig_f)[0]
unindent(source)
source = "".join(source)
self.ast = ast.parse(source)
rewriter = SchedulerRewriter(concurrent.functions.keys())
rewriter.visit(self.ast.body[0])
ast.fix_missing_locations(self.ast)
out = compile(self.ast, "<string>", "exec")
scope = dict(self.orig_f.__globals__)
exec(out, scope)
self.f = scope[self.orig_f.__name__]
return self.f(*args, **kwargs)
def transform_ast(self, node):
"""Apply the AST transformations from self.ast_transformers
Parameters
----------
node : ast.Node
The root node to be transformed. Typically called with the ast.Module
produced by parsing user input.
Returns
-------
An ast.Node corresponding to the node it was called with. Note that it
may also modify the passed object, so don't rely on references to the
original AST.
"""
for transformer in self.ast_transformers:
try:
node = transformer.visit(node)
except InputRejected:
# User-supplied AST transformers can reject an input by raising
# an InputRejected. Short-circuit in this case so that we
# don't unregister the transform.
raise
except Exception:
warn("AST transformer %r threw an error. It will be unregistered." % transformer)
self.ast_transformers.remove(transformer)
if self.ast_transformers:
ast.fix_missing_locations(node)
return node
def load_file(filename, env):
from utils import e
filename = e(filename)
g = GlobalsWrapper(env, filename)
with open(filename, 'r') as f:
tree = ast.parse(f.read(), filename)
t2 = ast.fix_missing_locations(AstTransformer().visit(tree))
exec(compile(t2, filename, 'exec'), g)
return g.dict
def transform_ast(self, node):
"""Apply the AST transformations from self.ast_transformers
Parameters
----------
node : ast.Node
The root node to be transformed. Typically called with the ast.Module
produced by parsing user input.
Returns
-------
An ast.Node corresponding to the node it was called with. Note that it
may also modify the passed object, so don't rely on references to the
original AST.
"""
for transformer in self.ast_transformers:
try:
node = transformer.visit(node)
except InputRejected:
# User-supplied AST transformers can reject an input by raising
# an InputRejected. Short-circuit in this case so that we
# don't unregister the transform.
raise
except Exception:
warn("AST transformer %r threw an error. It will be unregistered." % transformer)
self.ast_transformers.remove(transformer)
if self.ast_transformers:
ast.fix_missing_locations(node)
return node
def test_invalid_identitifer(self):
m = ast.Module([ast.Expr(ast.Name(u"x", ast.Load()))])
ast.fix_missing_locations(m)
with self.assertRaises(TypeError) as cm:
compile(m, "<test>", "exec")
if test_support.check_impl_detail():
self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self):
m = ast.Module([ast.Expr(ast.Str(43))])
ast.fix_missing_locations(m)
with self.assertRaises(TypeError) as cm:
compile(m, "<test>", "exec")
if test_support.check_impl_detail():
self.assertIn("string must be of type str or uni", str(cm.exception))
def test_fix_missing_locations(self):
src = ast.parse('write("spam")')
src.body.append(ast.Expr(ast.Call(ast.Name('spam', ast.Load()),
[ast.Str('eggs')], [], None, None)))
self.assertEqual(src, ast.fix_missing_locations(src))
self.assertEqual(ast.dump(src, include_attributes=True),
"Module(body=[Expr(value=Call(func=Name(id='write', ctx=Load(), "
"lineno=1, col_offset=0), args=[Str(s='spam', lineno=1, "
"col_offset=6)], keywords=[], starargs=None, kwargs=None, "
"lineno=1, col_offset=0), lineno=1, col_offset=0), "
"Expr(value=Call(func=Name(id='spam', ctx=Load(), lineno=1, "
"col_offset=0), args=[Str(s='eggs', lineno=1, col_offset=0)], "
"keywords=[], starargs=None, kwargs=None, lineno=1, "
"col_offset=0), lineno=1, col_offset=0)])"
)
def test_invalid_identitifer(self):
m = ast.Module([ast.Expr(ast.Name(42, ast.Load()))])
ast.fix_missing_locations(m)
with self.assertRaises(TypeError) as cm:
compile(m, "<test>", "exec")
self.assertIn("identifier must be of type str", str(cm.exception))
def test_invalid_string(self):
m = ast.Module([ast.Expr(ast.Str(42))])
ast.fix_missing_locations(m)
with self.assertRaises(TypeError) as cm:
compile(m, "<test>", "exec")
self.assertIn("string must be of type str", str(cm.exception))
def test_fix_missing_locations(self):
src = ast.parse('write("spam")')
src.body.append(ast.Expr(ast.Call(ast.Name('spam', ast.Load()),
[ast.Str('eggs')], [], None, None)))
self.assertEqual(src, ast.fix_missing_locations(src))
self.assertEqual(ast.dump(src, include_attributes=True),
"Module(body=[Expr(value=Call(func=Name(id='write', ctx=Load(), "
"lineno=1, col_offset=0), args=[Str(s='spam', lineno=1, "
"col_offset=6)], keywords=[], starargs=None, kwargs=None, "
"lineno=1, col_offset=0), lineno=1, col_offset=0), "
"Expr(value=Call(func=Name(id='spam', ctx=Load(), lineno=1, "
"col_offset=0), args=[Str(s='eggs', lineno=1, col_offset=0)], "
"keywords=[], starargs=None, kwargs=None, lineno=1, "
"col_offset=0), lineno=1, col_offset=0)])"
)
def mod(self, mod, msg=None, mode="exec", *, exc=ValueError):
mod.lineno = mod.col_offset = 0
ast.fix_missing_locations(mod)
with self.assertRaises(exc) as cm:
compile(mod, "<test>", mode)
if msg is not None:
self.assertIn(msg, str(cm.exception))