def checkGlobalIds(a, l):
if not isinstance(a, ast.AST):
return
elif type(a) in [ ast.Load, ast.Store, ast.Del, ast.AugLoad, ast.AugStore, ast.Param ]:
return
if not hasattr(a, "global_id"):
addedNodes = ["propagatedVariable", "orderedBinOp",
"augAssignVal", "augAssignBinOp",
"combinedConditional", "combinedConditionalOp",
"multiCompPart", "multiCompOp",
"second_global_id", "moved_line",
# above this line has individualize functions. below does not.
"addedNot", "addedNotOp", "addedOther", "addedOtherOp",
"collapsedExpr", "removedLines",
"helperVar", "helperReturn",
"typeCastFunction", ]
for t in addedNodes:
if hasattr(a, t):
break
else: # only enter the else if none of the provided types are an attribute of a
log("canonicalize\tcheckGlobalIds\tNo global id: " + str(l) + "," + str(a.__dict__) + "," + printFunction(a, 0), "bug")
for f in ast.iter_child_nodes(a):
checkGlobalIds(f, l + [type(a)])
python类Param()的实例源码
def _transform_function_arguments(left):
if type(left) is ast.Name:
names = [left]
else:
names = left.elts
# Python3
if hasattr(_ast, 'arg'):
args = [_ast.arg(annotation=None, arg=name.id, col_offset = name.col_offset, lineno=name.lineno) for name in names]
return ast.arguments(args=args, defaults=[], kwonlyargs=[], kw_defaults=[])
# Python 2
arguments = ast.arguments(args=names, defaults=[])
for argument in arguments.args:
argument.ctx = ast.Param()
return arguments
def to_source_any(n):
"""
Convert AST node to string, handling all node types, without fixing comments.
"""
try:
return astor.to_source(n)
except AttributeError:
pass
cls = n.__class__
if cls in astor.misc.all_symbols:
return astor.misc.all_symbols[cls]
def wrap(s):
return '___' + s + '___'
extra_d = {ast.Load: wrap('load'),
ast.Store: wrap('store'),
ast.Del: wrap('del'),
ast.AugLoad: wrap('augload'),
ast.AugStore: wrap('augstore'),
ast.Param: wrap('param'),
ast.keyword: wrap('keyword')}
if cls in extra_d:
return extra_d[cls]
raise AttributeError('unknown node type {}'.format(cls))
def make_plugin_module(rule_body):
return FavaCode(
Module(
body=[
FunctionDef(
name='report',
args=arguments(
args=[
Name(id='shared', ctx=Param()),
],
vararg=None,
kwarg=None,
defaults=[]),
body=[
Return(
value=rule_body.get_ast())],
decorator_list=[
Call(
func=Name(id='fava_rule', ctx=Load()),
args=[Name(id=each, ctx=Load()) for each in rule_body.get_parsers()],
keywords=[],
starargs=None, kwargs=None),
])
]),
[rule_body])
def make_wrapper(new_variables, value):
"""
Define a set of new variables by createing a call to a lambda function.
The value becomes the body of the lambda.
The names of the new_variables become the names of the formal parameters to the lambda.
The values of the new_variables become the values of the actual arguments to the call.
"""
return FavaCode(
Call(
func=Lambda(
args=arguments(
args=[Name(id=key, ctx=Param()) for key, val in new_variables.iteritems()],
vararg=None, kwarg=None, defaults=[]),
body=value.get_ast()),
args=[val.get_ast() for key, val in new_variables.iteritems()],
keywords=[],
starargs=None,
kwargs=None),
[value] + [val for key, val in new_variables.iteritems()])
def pythoneval(self, args, debug=False):
refs = [ast.Name("x{0}".format(i), ast.Load()) for i in xrange(len(args))]
if sys.version_info[0] <= 2:
params = ast.arguments([ast.Name("x{0}".format(i), ast.Param()) for i in xrange(len(args))], None, None, [])
fcn = ast.FunctionDef("tmp", params, [ast.Return(self.pythonast(refs))], [])
else:
params = ast.arguments([ast.arg("x{0}".format(i), None) for i in xrange(len(args))], None, [], [], None, [])
fcn = ast.FunctionDef("tmp", params, [ast.Return(self.pythonast(refs))], [], None)
moduleast = ast.Module([fcn])
fakeLineNumbers(moduleast)
if debug:
print(astToSource(fcn))
modulecomp = compile(moduleast, "Femtocode", "exec")
out = {"$importlib": importlib, "$math": math}
exec(modulecomp, out)
return out["tmp"](*args)
def _make_fn(name, chain_fn, args, defaults):
args_with_self = ['_self'] + list(args)
arguments = [_ast.Name(id=arg, ctx=_ast.Load()) for arg in args_with_self]
defs = [_ast.Name(id='_def{0}'.format(idx), ctx=_ast.Load()) for idx, _ in enumerate(defaults)]
if _PY2:
parameters = _ast.arguments(args=[_ast.Name(id=arg, ctx=_ast.Param()) for arg in args_with_self],
defaults=defs)
else:
parameters = _ast.arguments(args=[_ast.arg(arg=arg) for arg in args_with_self],
kwonlyargs=[],
defaults=defs,
kw_defaults=[])
module_node = _ast.Module(body=[_ast.FunctionDef(name=name,
args=parameters,
body=[_ast.Return(value=_ast.Call(func=_ast.Name(id='_chain', ctx=_ast.Load()),
args=arguments,
keywords=[]))],
decorator_list=[])])
module_node = _ast.fix_missing_locations(module_node)
# compile the ast
code = compile(module_node, '<string>', 'exec')
# and eval it in the right context
globals_ = {'_chain': chain_fn}
locals_ = dict(('_def{0}'.format(idx), value) for idx, value in enumerate(defaults))
eval(code, globals_, locals_)
# extract our function from the newly created module
return locals_[name]
########################################################################
# Produce a docstring for the class.
def visit_Expr(self, node):
if type(node.value) is ast.Call:
call = node.value
if self.is_concurrent_call(call):
self.encounter_call(call)
return node
elif any([self.is_concurrent_call(arg) for arg in call.args]):
conc_args = [(i, arg) for i, arg in enumerate(call.args) if self.is_concurrent_call(arg)]
if len(conc_args) > 1:
raise self.not_implemented_error(call, "Functions with multiple @concurrent parameters are unsupported")
conc_call = conc_args[0][1]
self.encounter_call(conc_call)
call.args[conc_args[0][0]] = ast.Name("__value__", ast.Load())
if sys.version_info >= (3, 0):
args = [ast.arg("__value__", None)]
else:
args = [ast.Name("__value__", ast.Param())]
call_lambda = ast.Lambda(ast.arguments(args = args, defaults = [], kwonlyargs = [], kw_defaults = []), call)
copy_location_kwargs = {
"func": ast.Attribute(conc_call.func, 'call', ast.Load()),
"args": [call_lambda] + conc_call.args,
"keywords": conc_call.keywords
}
if(sys.version_info < (3, 0)):
copy_location_kwargs["kwargs"] = conc_call.kwargs
return copy_location(ast.Expr(ast.Call(**copy_location_kwargs)), node)
return self.generic_visit(node)
def generate_name(self, node, ext_info={'is_arg': False}):
is_arg = ext_info.get('is_arg', False)
if isinstance(node.ctx, ast.Store) or isinstance(node.ctx, ast.Param):
return 'export ' + node.id if self.is_global else 'local ' + node.id
else:
if is_arg and self.get_type(node).is_list:
return node.id + '[@]'
else:
return '$' + node.id
def visit_Name(self, node):
if (isinstance(node.ctx, ast.Load) and
node.id not in IGNORED_VARIABLE_NAMES):
self.used_names.add(node.id)
elif isinstance(node.ctx, (ast.Param, ast.Store)):
self._define_variable(node.id, node)
def _make_list_func(tree, field_names_dict, filename=None, add_args=None):
filename = filename or '<generated>'
list_name = 'data'
rewriter = RewriteName(list_name, field_names_dict)
lambda_args = [ast.Name(id=list_name, ctx=ast.Param(), lineno=0, col_offset=0)]
for arg in (add_args or []):
lambda_args.append(
ast.Name(id=arg, ctx=ast.Param(), lineno=0, col_offset=0)
)
tree = rewriter.visit(tree)
tree = ast.Expression(
body = ast.Lambda(
lineno = 0,
col_offset = 0,
body = tree.body,
args = ast.arguments(
args = lambda_args,
vararg = None,
kwarg = None,
defaults = [],
)
),
)
code = compile(tree, filename, 'eval')
func = eval(code)
return func
def make_rule_function(rule_body):
"""
Create a rule function as a lambda expression.
"""
return FavaCode(
Expression(
body=Lambda(
args=arguments(
args=[Name(id='shared', ctx=Param())],
vararg=None, kwarg=None, defaults=[]),
body=rule_body.get_ast())),
[rule_body])
def _labeled_translation(idx_mapping, arg_translation):
lambda_translation = ast.Lambda(
args=ast.arguments(
args=[ast.Name(id='x', ctx=ast.Param())],
vararg=None,
kwarg=None,
defaults=[]),
body=ast.Tuple(
elts=list(
ast.Subscript(
value=ast.Name(
id='x',
ctx=ast.Load()),
slice=ast.Index(
value=ast.Num(n=n)),
ctx=ast.Load())
for n in idx_mapping
),
ctx=ast.Load()
)
)
return ast.Call(
func=lambda_translation,
args=[arg_translation],
keywords=[],
starargs=[],
kwargs=None
)
def make_grp_list_func(expr_str, field_names_dict, global_names_dict=None, filename=None):
"""
>>> func = make_grp_list_func('sum(a)/cnt()', {'a': 0})
>>> func([(1,), (2,), (3,)])
2
>>> func = make_grp_list_func('1+sum(a)+sum(b**2)', {'a':0, 'b':1})
>>> func([(10, 1), (20, 2)])
36
>>> func = make_grp_list_func('b+sum(a)/sum(c)', {'a': 0, 'c': 1}, {'b': 0})
>>> func([(10, 1), (20, 2), (30, 3)], [1])
11
>>> func = make_grp_list_func('sum(a)', {'a': 0})
>>> func([(1,), (2,), (3,)])
6
"""
filename = filename or '<generated>'
list_name = 'data'
lambda_args = [ast.Name(id=list_name, ctx=ast.Param(), lineno=0, col_offset=0)]
tree = ast.parse(expr_str, filename, 'eval')
tree = RewriteCntFuncs().visit(tree)
tree = RewriteGrpFuncs(field_names_dict).visit(tree)
if global_names_dict:
lambda_args.append(ast.Name(id='globaldata', ctx=ast.Param(), lineno=0, col_offset=0))
tree = RewriteName('globaldata', global_names_dict).visit(tree)
tree = ast.Expression(
body = ast.Lambda(
body=tree.body,
args=ast.arguments(
args=lambda_args,
vararg=None,
kwarg=None,
defaults=[],
),
lineno=0,
col_offset=0,
),
)
#print ast.dump(tree)
code = compile(tree, filename, 'eval')
func = eval(code)
return func