def visit_Lambda(self, node):
"""
Lambda(arguments args, expr body)
"""
""" [Lambda Definition] :
<Python> lambda x,y :x*y
<Ruby> lambda{|x,y| x*y}
<Python> lambda *x: print(x)
<Ruby> lambda {|*x| print(x)}
<Python> def foo(x, y):
x(y)
foo(lambda x: print(a), a)
<Ruby> def foo(x, y)
x.(y)
end
foo(lambda{|x| print(a)}, a)
"""
return "lambda{|%s| %s}" % (self.visit(node.args), self.visit(node.body))
python类Lambda()的实例源码
def visit_Compare(self, node):
"""
:type node: _ast.FunctionDef
"""
is_lambda_def = len(node.ops) == 1\
and type(node.ops[0]) is _ast.Gt \
and (type(node.left) is _ast.Tuple or type(node.left) is _ast.Name) \
and all(map(lambda t: type(t) == _ast.Name, getattr(node.left, 'elts', [])))
if not is_lambda_def:
return node
arguments = _transform_function_arguments(node.left)
function_body = node.comparators[0]
lambda_ast_transform = ast.Lambda(args=arguments,
body=function_body,
lineno=node.lineno,
col_offset=node.col_offset)
return lambda_ast_transform
def run_call(args, node, process, get_func, **kwargs):
# Get function expression
if isinstance(node, ast.FunctionDef): # function name
func_expr = ast.Name(id=node.name, ctx=ast.Load())
elif isinstance(node, ast.Lambda): # lambda body expr
func_expr = node
else: raise TypeError("Only function definition or lambda may be called")
# args is a call string or argument list/dict
if isinstance(args, str):
parsed = ast.parse(args).body[0].value
parsed.func = func_expr
ast.fix_missing_locations(parsed)
return get_func(process = process, tree = parsed, **kwargs)
else:
# e.g. list -> {args: [...], kwargs: {}}
fmt_args = fix_format(args)
ast.fix_missing_locations(func_expr)
return get_func(process = process, tree=func_expr, call = fmt_args, **kwargs)
def make_wrapper(new_variables, value):
"""
Define a set of new variables by createing a call to a lambda function.
The value becomes the body of the lambda.
The names of the new_variables become the names of the formal parameters to the lambda.
The values of the new_variables become the values of the actual arguments to the call.
"""
return FavaCode(
Call(
func=Lambda(
args=arguments(
args=[Name(id=key, ctx=Param()) for key, val in new_variables.iteritems()],
vararg=None, kwarg=None, defaults=[]),
body=value.get_ast()),
args=[val.get_ast() for key, val in new_variables.iteritems()],
keywords=[],
starargs=None,
kwargs=None),
[value] + [val for key, val in new_variables.iteritems()])
def visit_Expr(self, node):
if type(node.value) is ast.Call:
call = node.value
if self.is_concurrent_call(call):
self.encounter_call(call)
return node
elif any([self.is_concurrent_call(arg) for arg in call.args]):
conc_args = [(i, arg) for i, arg in enumerate(call.args) if self.is_concurrent_call(arg)]
if len(conc_args) > 1:
raise self.not_implemented_error(call, "Functions with multiple @concurrent parameters are unsupported")
conc_call = conc_args[0][1]
self.encounter_call(conc_call)
call.args[conc_args[0][0]] = ast.Name("__value__", ast.Load())
if sys.version_info >= (3, 0):
args = [ast.arg("__value__", None)]
else:
args = [ast.Name("__value__", ast.Param())]
call_lambda = ast.Lambda(ast.arguments(args = args, defaults = [], kwonlyargs = [], kw_defaults = []), call)
copy_location_kwargs = {
"func": ast.Attribute(conc_call.func, 'call', ast.Load()),
"args": [call_lambda] + conc_call.args,
"keywords": conc_call.keywords
}
if(sys.version_info < (3, 0)):
copy_location_kwargs["kwargs"] = conc_call.kwargs
return copy_location(ast.Expr(ast.Call(**copy_location_kwargs)), node)
return self.generic_visit(node)
def visit_Lambda(self, node):
# type: (ast.Lambda) -> None
# Lambda is going to be a bit tricky because
# there's a new child namespace (via .get_children()),
# but it's not something that will show up in the
# current symbol table via .lookup().
# For now, we're going to ignore lambda expressions.
pass
def test_lambda(self):
a = ast.arguments([], None, None, [], None, None, [], [])
self.expr(ast.Lambda(a, ast.Name("x", ast.Store())),
"must have Load context")
def fac(args):
return ast.Lambda(args, ast.Name("x", ast.Load()))
self._check_arguments(fac, self.expr)
def _make_list_func(tree, field_names_dict, filename=None, add_args=None):
filename = filename or '<generated>'
list_name = 'data'
rewriter = RewriteName(list_name, field_names_dict)
lambda_args = [ast.Name(id=list_name, ctx=ast.Param(), lineno=0, col_offset=0)]
for arg in (add_args or []):
lambda_args.append(
ast.Name(id=arg, ctx=ast.Param(), lineno=0, col_offset=0)
)
tree = rewriter.visit(tree)
tree = ast.Expression(
body = ast.Lambda(
lineno = 0,
col_offset = 0,
body = tree.body,
args = ast.arguments(
args = lambda_args,
vararg = None,
kwarg = None,
defaults = [],
)
),
)
code = compile(tree, filename, 'eval')
func = eval(code)
return func
def test_lambda(self):
a = ast.arguments([], None, [], [], None, [])
self.expr(ast.Lambda(a, ast.Name("x", ast.Store())),
"must have Load context")
def fac(args):
return ast.Lambda(args, ast.Name("x", ast.Load()))
self._check_arguments(fac, self.expr)
def maximum_function_args(self):
"""Get the maximum number of function arguments appearing in the AST"""
user_func_defs = [node for node in self.all_nodes if isinstance(node, (ast.FunctionDef, ast.Lambda))]
stub_func_defs = [node for node in self.stub_nodes if isinstance(node, (ast.FunctionDef, ast.Lambda))]
# A minimum value of 1 because a default __init__ with one argument function
# is added to classes that doesn't contain one
user_func_max = max([len(node.args.args) for node in user_func_defs] + [1])
stub_func_max = max([len(node.args.args) for node in stub_func_defs] + [1])
return max(user_func_max, stub_func_max)
def parse_node(cls, node):
normal_args = cls.get_arg_tuples(node.args.args, node.args.defaults)
kwonlyargs = cls.get_arg_tuples(node.args.kwonlyargs, node.args.kw_defaults)
# TODO: all single args should be tuples like this
vararg = cls.get_arg(node.args.vararg)
kwarg = cls.get_arg(node.args.kwarg)
# create context variables
target_vars = [arg[0] for arg in normal_args]
if vararg: target_vars.append(vararg)
if kwarg: target_vars.append(kwarg)
args = cls.get_arg_parts(node.args.args, node.args.defaults, 'arg')
kw_args = cls.get_arg_parts(node.args.kwonlyargs, node.args.kw_defaults, 'kwonly')
varargs = cls.get_arg_part(node.args.vararg, None, 'vararg')
kwargs = cls.get_arg_part(node.args.kwarg, None, 'kwarg')
all_args = [*args, varargs, *kw_args, kwargs]
if isinstance(node, ast.Lambda): body_node = node.body
else: body_node = FunctionBodyTransformer().visit(ast.Module(node.body))
return {
"node": node,
"name": getattr(node, 'name', None),
"args": IndexedDict([ (p['name'], p) for p in all_args if p is not None]),
# TODO: arg is the node counterpart to target_vars
"_spec1_args": args,
"*args": varargs,
"**kwargs": kwargs,
"body": {'node': body_node,
'target_vars': TargetVars(target_vars)}
}
def make_rule_function(rule_body):
"""
Create a rule function as a lambda expression.
"""
return FavaCode(
Expression(
body=Lambda(
args=arguments(
args=[Name(id='shared', ctx=Param())],
vararg=None, kwarg=None, defaults=[]),
body=rule_body.get_ast())),
[rule_body])
def test_lambda(self):
a = ast.arguments([], None, [], [], None, [])
self.expr(ast.Lambda(a, ast.Name("x", ast.Store())),
"must have Load context")
def fac(args):
return ast.Lambda(args, ast.Name("x", ast.Load()))
self._check_arguments(fac, self.expr)
def translate_lambda(compiler, lambda_):
return python_ast.Lambda(
args=python_ast.arguments(
args=list(map(lambda n: python_ast.arg(arg=n, annotation=None), lambda_.params)),
vararg=None,
kwonlyargs=[],
kw_defaults=[],
kwarg=None,
defaults=[]
),
body=compiler.translate(lambda_.body)
)
def has_nested_definitions(function):
return has_nodes(function.tree.body, (ast.FunctionDef, ast.ClassDef, ast.Lambda))
def _labeled_translation(idx_mapping, arg_translation):
lambda_translation = ast.Lambda(
args=ast.arguments(
args=[ast.Name(id='x', ctx=ast.Param())],
vararg=None,
kwarg=None,
defaults=[]),
body=ast.Tuple(
elts=list(
ast.Subscript(
value=ast.Name(
id='x',
ctx=ast.Load()),
slice=ast.Index(
value=ast.Num(n=n)),
ctx=ast.Load())
for n in idx_mapping
),
ctx=ast.Load()
)
)
return ast.Call(
func=lambda_translation,
args=[arg_translation],
keywords=[],
starargs=[],
kwargs=None
)
def translate_Lambda(self, ctx, e):
args_translation = ast.arguments(
args=[
ast.Name(id=arg.uniq_id)
for arg in e.args.args
],
vararg=None,
kwarg=None,
defaults=[])
body_translation = ctx.translate(e.body)
return ast.Lambda(
args=args_translation,
body=body_translation)
def visit_Assign(self, node):
"""
Assign(expr* targets, expr value)
"""
assert len(node.targets) == 1
target = node.targets[0]
#~ if self._class_name:
#~ target = self._class_name + '.' + target
# ast.Tuple, ast.List, ast.*
value = self.visit(node.value)
#if isinstance(node.value, (ast.Tuple, ast.List)):
# value = "[%s]" % self.visit(node.value)
#else:
# value = self.visit(node.value)
if isinstance(target, (ast.Tuple, ast.List)):
# multiassign.py
""" x, y, z = [1, 2, 3] """
x = [self.visit(t) for t in target.elts]
self.write("%s = %s" % (','.join(x), value))
elif isinstance(target, ast.Subscript) and isinstance(target.slice, ast.Index):
# found index assignment # a[0] = xx
#self.write("%s%s = %s" % (self.visit(target.value), # a[0] = xx
name = self.visit(target.value)
for arg in self._function_args:
if arg == ("**%s" % name):
self._is_string_symbol = True
self.write("%s[%s] = %s" % (name, self.visit(target.slice), value))
self._is_string_symbol = False
elif isinstance(target, ast.Subscript) and isinstance(target.slice, ast.Slice):
# found slice assignmnet
self.write("%s[%s...%s] = %s" % (self.visit(target.value),
self.visit(target.slice.lower), self.visit(target.slice.upper),
value))
else:
if isinstance(target, ast.Name):
var = self.visit(target)
if not (var in self._scope):
self._scope.append(var)
if isinstance(node.value, ast.Call):
if isinstance(node.value.func, ast.Name):
if node.value.func.id in self._class_names:
self._classes_self_functions_args[var] = self._classes_self_functions_args[node.value.func.id]
# set lambda functions
if isinstance(node.value, ast.Lambda):
self._lambda_functions.append(var)
self.write("%s = %s" % (var, value))
elif isinstance(target, ast.Attribute):
var = self.visit(target)
""" [instance variable] :
<Python> self.foo = hoge
<Ruby> @foo = hoge
"""
if var == 'self':
self.write("@%s = %s" % (str(target.attr), value))
self._class_self_variables.append(str(target.attr))
else:
self.write("%s = %s" % (var, value))
else:
raise RubyError("Unsupported assignment type")
def visit_FunctionDef(self, node):
"""
:type node: _ast.FunctionDef
"""
children = node.body
lambda_assign_children = [child for child in children
if type(child) == _ast.Assign
and len(child.targets) == 1
and type(child.value) == _ast.Compare
and (type(child.value.left) == _ast.Tuple or type(child.value.left) == _ast.Name)
and all(map(lambda t: type(t) == _ast.Name, getattr(child.value.left, 'elts', [])))]
# Support single line lambdas outside of assigns
other_children = [child for child in children if child not in lambda_assign_children]
for child in other_children:
CompareNodeVisitor().visit(child)
for assign_type_child in lambda_assign_children:
arguments = _transform_function_arguments(assign_type_child.value.left)
function_body = assign_type_child.value.comparators[0]
if _is_multiline_lambda(function_body):
all_statements = function_body.elts
return_statement = all_statements[-1]
statements = all_statements[0:len(all_statements) - 1]
statements = _transform_multiline_assignment_statements(statements)
return_statement = _transform_multiline_return_statement(return_statement)
assign_target = assign_type_child.targets[0]
if type(assign_target) is _ast.Attribute:
function_name = assign_target.attr
else:
function_name = assign_target.id
all_transformed_statements = statements + [return_statement]
functiondef_object = ast.FunctionDef(args = arguments,
body=all_transformed_statements,
lineno=assign_type_child.lineno,
name=function_name,
col_offset=assign_type_child.col_offset,
decorator_list=[])
children.insert(0, functiondef_object)
assign_type_child.value = ast.Name(id=functiondef_object.name,
col_offset=functiondef_object.col_offset,
lineno=functiondef_object.lineno,
ctx=ast.Load())
else:
lambda_ast_transform = ast.Lambda(args=arguments,
body=function_body,
lineno=assign_type_child.lineno,
col_offset = assign_type_child.col_offset)
assign_type_child.value = lambda_ast_transform
return node
def make_grp_list_func(expr_str, field_names_dict, global_names_dict=None, filename=None):
"""
>>> func = make_grp_list_func('sum(a)/cnt()', {'a': 0})
>>> func([(1,), (2,), (3,)])
2
>>> func = make_grp_list_func('1+sum(a)+sum(b**2)', {'a':0, 'b':1})
>>> func([(10, 1), (20, 2)])
36
>>> func = make_grp_list_func('b+sum(a)/sum(c)', {'a': 0, 'c': 1}, {'b': 0})
>>> func([(10, 1), (20, 2), (30, 3)], [1])
11
>>> func = make_grp_list_func('sum(a)', {'a': 0})
>>> func([(1,), (2,), (3,)])
6
"""
filename = filename or '<generated>'
list_name = 'data'
lambda_args = [ast.Name(id=list_name, ctx=ast.Param(), lineno=0, col_offset=0)]
tree = ast.parse(expr_str, filename, 'eval')
tree = RewriteCntFuncs().visit(tree)
tree = RewriteGrpFuncs(field_names_dict).visit(tree)
if global_names_dict:
lambda_args.append(ast.Name(id='globaldata', ctx=ast.Param(), lineno=0, col_offset=0))
tree = RewriteName('globaldata', global_names_dict).visit(tree)
tree = ast.Expression(
body = ast.Lambda(
body=tree.body,
args=ast.arguments(
args=lambda_args,
vararg=None,
kwarg=None,
defaults=[],
),
lineno=0,
col_offset=0,
),
)
#print ast.dump(tree)
code = compile(tree, filename, 'eval')
func = eval(code)
return func
def infer(node, context, solver, from_call=False):
"""Infer the type of a given AST node"""
if isinstance(node, ast.Num):
return infer_numeric(node, solver)
elif isinstance(node, ast.Str):
return solver.z3_types.string
elif (sys.version_info[0] >= 3 and sys.version_info[1] >= 6 and
(isinstance(node, ast.FormattedValue) or isinstance(node, ast.JoinedStr))):
# Formatted strings were introduced in Python 3.6
return solver.z3_types.string
elif isinstance(node, ast.Bytes):
return solver.z3_types.bytes
elif isinstance(node, ast.List):
return infer_list(node, context, solver)
elif isinstance(node, ast.Dict):
return infer_dict(node, context, solver)
elif isinstance(node, ast.Tuple):
return infer_tuple(node, context, solver)
elif isinstance(node, ast.NameConstant):
return infer_name_constant(node, solver)
elif isinstance(node, ast.Set):
return infer_set(node, context, solver)
elif isinstance(node, ast.BinOp):
return infer_binary_operation(node, context, solver)
elif isinstance(node, ast.BoolOp):
return infer_boolean_operation(node, context, solver)
elif isinstance(node, ast.UnaryOp):
return infer_unary_operation(node, context, solver)
elif isinstance(node, ast.IfExp):
return infer_if_expression(node, context, solver)
elif isinstance(node, ast.Subscript):
return infer_subscript(node, context, solver)
elif sys.version_info[0] >= 3 and sys.version_info[1] >= 5 and isinstance(node, ast.Await):
# Await and Async were introduced in Python 3.5
return infer(node.value, context, solver)
elif isinstance(node, ast.Yield):
return infer(node.value, context, solver)
elif isinstance(node, ast.Compare):
return infer_compare(node, context, solver)
elif isinstance(node, ast.Name):
return infer_name(node, context)
elif isinstance(node, ast.ListComp):
return infer_sequence_comprehension(node, solver.z3_types.list, context, solver)
elif isinstance(node, ast.SetComp):
return infer_sequence_comprehension(node, solver.z3_types.set, context, solver)
elif isinstance(node, ast.DictComp):
return infer_dict_comprehension(node, context, solver)
elif isinstance(node, ast.Call):
return infer_func_call(node, context, solver)
elif isinstance(node, ast.Attribute):
return infer_attribute(node, context, from_call, solver)
elif isinstance(node, ast.Lambda):
return _infer_lambda(node, context, solver)
raise NotImplementedError("Inference for expression {} is not implemented yet.".format(type(node).__name__))