def occursIn(sub, super):
"""Does the first AST occur as a subtree of the second?"""
superStatementTypes = [ ast.Module, ast.Interactive, ast.Suite,
ast.FunctionDef, ast.ClassDef, ast.For,
ast.While, ast.If, ast.With, ast.Try,
ast.ExceptHandler ]
if (not isinstance(super, ast.AST)):
return False
if type(sub) == type(super) and compareASTs(sub, super, checkEquality=True) == 0:
return True
# we know that a statement can never occur in an expression
# (or in a non-statement-holding statement), so cut the search off now to save time.
if isStatement(sub) and type(super) not in superStatementTypes:
return False
for child in ast.iter_child_nodes(super):
if occursIn(sub, child):
return True
return False
python类While()的实例源码
def add_vars_conditional(self, JOIN, cfg_node):
varse = None
if isinstance(cfg_node.ast_node, ast.While):
vv = VarsVisitor()
vv.visit(cfg_node.ast_node.test)
varse = vv.result
elif self.is_output(cfg_node):
vv = VarsVisitor()
vv.visit(cfg_node.ast_node)
varse = vv.result
elif isinstance(cfg_node.ast_node, ast.If):
vv = VarsVisitor()
vv.visit(cfg_node.ast_node.test)
varse = vv.result
for var in varse:
JOIN = JOIN | self.lattice.el2bv[var]
return JOIN
def varse(node):
vv = VarsVisitor()
if isinstance(node.ast_node, ast.FunctionDef) or\
isinstance(node.ast_node, ast.ClassDef):
return list()
elif isinstance(node.ast_node, ast.While)\
or isinstance(node.ast_node, ast.If):
vv.visit(node.ast_node.test)
else:
try:
vv.visit(node.ast_node)
except AttributeError:
return list()
if isinstance(node, AssignmentNode):
result = list()
for var in vv.result:
if var not in node.left_hand_side:
result.append(var)
return result
else:
return vv.result
def get_compound_bodies(node):
"""Returns a list of bodies of a compound statement node.
Args:
node: AST node.
Returns:
A list of bodies of the node. If the given node does not represent
a compound statement, an empty list is returned.
"""
if isinstance(node, (ast.Module, ast.FunctionDef, ast.ClassDef, ast.With)):
return [node.body]
elif isinstance(node, (ast.If, ast.While, ast.For)):
return [node.body, node.orelse]
elif PY2 and isinstance(node, ast.TryFinally):
return [node.body, node.finalbody]
elif PY2 and isinstance(node, ast.TryExcept):
return [node.body, node.orelse] + [h.body for h in node.handlers]
elif PY3 and isinstance(node, ast.Try):
return ([node.body, node.orelse, node.finalbody]
+ [h.body for h in node.handlers])
end
return []
def get_coverable_nodes(cls):
return {
ast.Assert,
ast.Assign,
ast.AugAssign,
ast.Break,
ast.Continue,
ast.Delete,
ast.Expr,
ast.Global,
ast.Import,
ast.ImportFrom,
ast.Nonlocal,
ast.Pass,
ast.Raise,
ast.Return,
ast.FunctionDef,
ast.ClassDef,
ast.TryExcept,
ast.TryFinally,
ast.ExceptHandler,
ast.If,
ast.For,
ast.While,
}
def get_coverable_nodes(cls):
return {
ast.Assert,
ast.Assign,
ast.AugAssign,
ast.Break,
ast.Continue,
ast.Delete,
ast.Expr,
ast.Global,
ast.Import,
ast.ImportFrom,
ast.Nonlocal,
ast.Pass,
ast.Raise,
ast.Return,
ast.ClassDef,
ast.FunctionDef,
ast.Try,
ast.ExceptHandler,
ast.If,
ast.For,
ast.While,
}
def _build_node_cfg(node):
handlers = {
ast.If: _build_if_cfg,
ast.For: _build_loop_cfg,
ast.While: _build_loop_cfg,
ast.With: _build_with_cfg,
ast.Break: _build_break_cfg,
ast.Continue: _build_continue_cfg,
ast.Return: _build_return_cfg,
ast.Try: _build_try_cfg,
}
if type(node) in handlers:
handler = handlers[type(node)]
else:
handler = _build_statement_cfg
return handler(node)
def _translate_body(self, body, types, allow_loose_in_edges=False, allow_loose_out_edges=False):
cfg_factory = CFGFactory(self._id_gen)
for child in body:
if isinstance(child, (ast.AnnAssign, ast.Expr)):
cfg_factory.add_stmts(self.visit(child, types))
elif isinstance(child, ast.If):
cfg_factory.complete_basic_block()
if_cfg = self.visit(child, types)
cfg_factory.append_cfg(if_cfg)
elif isinstance(child, ast.While):
cfg_factory.complete_basic_block()
while_cfg = self.visit(child, types)
cfg_factory.append_cfg(while_cfg)
elif isinstance(child, ast.Break):
cfg_factory.complete_basic_block()
break_cfg = self.visit(child, types)
cfg_factory.append_cfg(break_cfg)
elif isinstance(child, ast.Continue):
cfg_factory.complete_basic_block()
cont_cfg = self.visit(child, types)
cfg_factory.append_cfg(cont_cfg)
elif isinstance(child, ast.Pass):
if cfg_factory.incomplete_block():
pass
else:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
else:
raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!")
cfg_factory.complete_basic_block()
if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges:
cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen))
if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
return cfg_factory.cfg
def staticVars(l, vars):
"""Determines whether the given lines change the given variables"""
# First, if one of the variables can be modified, there might be a problem
mutableVars = []
for var in vars:
if (not (hasattr(var, "type") and (var.type in [int, float, str, bool]))):
mutableVars.append(var)
for i in range(len(l)):
if type(l[i]) == ast.Assign:
for var in vars:
if var.id in allVariableNamesUsed(l[i].targets[0]):
return False
elif type(l[i]) == ast.AugAssign:
for var in vars:
if var.id in allVariableNamesUsed(l[i].target):
return False
elif type(l[i]) in [ast.If, ast.While]:
if not (staticVars(l[i].body, vars) and staticVars(l[i].orelse, vars)):
return False
elif type(l[i]) == ast.For:
for var in vars:
if var.id in allVariableNamesUsed(l[i].target):
return False
if not (staticVars(l[i].body, vars) and staticVars(l[i].orelse, vars)):
return False
elif type(l[i]) in [ast.FunctionDef, ast.ClassDef, ast.Try, ast.With]:
log("transformations\tstaticVars\tMissing type: " + str(type(l[i])), "bug")
# If a mutable variable is used, we can't trust it
for var in mutableVars:
if var.id in allVariableNamesUsed(l[i]):
return False
return True
def isStatement(a):
"""Determine whether the given node is a statement (vs an expression)"""
return type(a) in [ ast.Module, ast.Interactive, ast.Expression, ast.Suite,
ast.FunctionDef, ast.ClassDef, ast.Return, ast.Delete,
ast.Assign, ast.AugAssign, ast.For, ast.While,
ast.If, ast.With, ast.Raise, ast.Try,
ast.Assert, ast.Import, ast.ImportFrom, ast.Global,
ast.Expr, ast.Pass, ast.Break, ast.Continue ]
def is_condition(self, cfg_node):
if isinstance(cfg_node.ast_node, (ast.If, ast.While)):
return True
elif self.is_output(cfg_node):
return True
return False
def get_vars(node):
vv = VarsVisitor()
if isinstance(node.ast_node, ast.While)\
or isinstance(node.ast_node, ast.If):
vv.visit(node.ast_node.test)
elif isinstance(node.ast_node, ast.FunctionDef) or\
isinstance(node.ast_node, ast.ClassDef):
return list()
else:
try:
vv.visit(node.ast_node)
except AttributeError: # If no ast_node
vv.result = list()
# remove duplicates:
vv.result = set(vv.result)
# Filter out lvars:
for var in vv.result:
try: # if assignment node
# print('r', node.right_hand_side_variables)
# if var not in node.left_hand_side:
if var in node.right_hand_side_variables:
yield var
except AttributeError:
yield var
def visit_If(self, node):
"""
replace if with while
https://greentreesnakes.readthedocs.io/en/latest/nodes.html#If
"""
self.generic_visit(node)
new_node = ast.While(test=node.test, body=node.body, orelse=node.orelse)
return new_node
def visit_While(self, node):
"""
replace while with if
https://greentreesnakes.readthedocs.io/en/latest/nodes.html#While
"""
self.generic_visit(node)
new_node = ast.If(test=node.test, body=node.body, orelse=node.orelse)
# return new_node
def test_while(self):
self.stmt(ast.While(ast.Num(3), [], []), "empty body on While")
self.stmt(ast.While(ast.Name("x", ast.Store()), [ast.Pass()], []),
"must have Load context")
self.stmt(ast.While(ast.Num(3), [ast.Pass()],
[ast.Expr(ast.Name("x", ast.Store()))]),
"must have Load context")
def _visit_if_while(self, node):
if not self.config.remove_dead_code:
return
if not isinstance(node.test, ast.Constant):
return
test_true = bool(node.test.value)
if test_true:
if isinstance(node, ast.While):
# while of 'while 1: ...' must not be removed
return
new_nodes = node.body
removed_nodes = node.orelse
reason = "test always true"
else:
new_nodes = node.orelse
removed_nodes = node.body
reason = "test always false"
if not can_remove(removed_nodes):
return
self.log_node_removal("Remove dead code (%s)" % reason,
removed_nodes)
return self._replace_node(node, new_nodes)
def add_to_block(self, node):
''' We want every try statement to be in its own block. '''
if not self.current_block:
return
# We only want the 'top level' statements
if self.current_line_num >= node.lineno:
return
# Special cases - test must be in its own block
if isinstance(node, ast.While) or isinstance(node, ast.For):
if not self.is_empty_block(self.current_block):
test_block = self.new_block()
self.current_block.exit_blocks.append(test_block)
self.use_next_block(test_block)
self.current_line_num = node.lineno
for f_block_type, f_block in reversed(self.frame_blocks):
if f_block_type == F_BLOCK_EXCEPT:
# Statement is in a try - set exits to next statement and
# excepts
self.current_block.statements.append(node)
for handler in f_block:
self.current_block.exit_blocks.append(handler)
# Special case
if isinstance(node, ast.While) or isinstance(node, ast.For):
break
next_statement_block = self.new_block()
self.current_block.exit_blocks.append(next_statement_block)
self.use_next_block(next_statement_block)
break
else:
self.current_block.statements.append(node)
def do_Loop(self, node):
''' For and While loops are treated the same. The only difference is
the possibility of iterators in a For loop.
The loop body always returns to test unless there is a break or
return.
The else body is entered when the test is false but not when there
is a break or an exception.
The next block of the test could in theory be the else or after.
But when we access it for the breaks we want it to be the after. '''
# Put the test in its own block
test_block = self.current_block
test_block.tag = Block.LOOP_HEADER
self.push_frame_block(F_BLOCK_LOOP, test_block)
after_loop_block = self.new_block()
loop_body_block = self.new_block()
self.add_to_exits(test_block, loop_body_block)
test_block.next = after_loop_block
self.use_block(loop_body_block)
for z in node.body:
self.visit(z)
self.check_child_exits(self.current_block, test_block)
self.pop_frame_block(F_BLOCK_LOOP, test_block)
if node.orelse:
else_body = self.new_block()
self.add_to_exits(test_block, else_body)
self.use_block(else_body)
else_body.next = after_loop_block
for z in node.orelse:
self.visit(z)
self.check_child_exits(self.current_block, after_loop_block)
else:
self.add_to_exits(test_block, after_loop_block)
self.use_next_block(after_loop_block)
def handleNodeDelete(self, node):
def on_conditional_branch():
"""
Return `True` if node is part of a conditional body.
"""
current = getattr(node, 'parent', None)
while current:
if isinstance(current, (ast.If, ast.While, ast.IfExp)):
return True
current = getattr(current, 'parent', None)
return False
name = getNodeName(node)
if not name:
return
if on_conditional_branch():
# We cannot predict if this conditional branch is going to
# be executed.
return
if isinstance(self.scope, FunctionScope) and name in self.scope.globals:
self.scope.globals.remove(name)
else:
try:
del self.scope[name]
except KeyError:
self.report(messages.UndefinedName, node, name)
def test_while(self):
self.stmt(ast.While(ast.Num(3), [], []), "empty body on While")
self.stmt(ast.While(ast.Name("x", ast.Store()), [ast.Pass()], []),
"must have Load context")
self.stmt(ast.While(ast.Num(3), [ast.Pass()],
[ast.Expr(ast.Name("x", ast.Store()))]),
"must have Load context")
def _translate_body(self, body, allow_loose_in_edges=False, allow_loose_out_edges=False):
cfg_factory = CfgFactory(self._id_gen)
for child in body:
if isinstance(child, (ast.Assign, ast.AugAssign, ast.Expr)):
cfg_factory.add_stmts(self.visit(child))
elif isinstance(child, ast.If):
cfg_factory.complete_basic_block()
if_cfg = self.visit(child)
cfg_factory.append_cfg(if_cfg)
elif isinstance(child, ast.While):
cfg_factory.complete_basic_block()
while_cfg = self.visit(child)
cfg_factory.append_cfg(while_cfg)
elif isinstance(child, ast.Break):
cfg_factory.complete_basic_block()
break_cfg = self.visit(child)
cfg_factory.append_cfg(break_cfg)
elif isinstance(child, ast.Continue):
cfg_factory.complete_basic_block()
cont_cfg = self.visit(child)
cfg_factory.append_cfg(cont_cfg)
elif isinstance(child, ast.Pass):
if cfg_factory.incomplete_block():
pass
else:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
else:
raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!")
cfg_factory.complete_basic_block()
if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges:
cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen))
if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
return cfg_factory.cfg
def infer(node, context, solver):
if isinstance(node, ast.Assign):
return _infer_assign(node, context, solver)
elif isinstance(node, ast.AugAssign):
return _infer_augmented_assign(node, context, solver)
elif isinstance(node, ast.Return):
if not node.value:
return solver.z3_types.none
return expr.infer(node.value, context, solver)
elif isinstance(node, ast.Delete):
return _infer_delete(node, context, solver)
elif isinstance(node, (ast.If, ast.While)):
return _infer_control_flow(node, context, solver)
elif isinstance(node, ast.For):
return _infer_for(node, context, solver)
elif sys.version_info[0] >= 3 and sys.version_info[1] >= 5 and isinstance(node, ast.AsyncFor):
# AsyncFor is introduced in Python 3.5
return _infer_for(node, context, solver)
elif isinstance(node, ast.With):
return _infer_with(node, context, solver)
elif sys.version_info[0] >= 3 and sys.version_info[1] >= 5 and isinstance(node, ast.AsyncWith):
# AsyncWith is introduced in Python 3.5
return _infer_with(node, context, solver)
elif isinstance(node, ast.Try):
return _infer_try(node, context, solver)
elif isinstance(node, ast.FunctionDef):
return _infer_func_def(node, context, solver)
elif isinstance(node, ast.ClassDef):
return _infer_class_def(node, context, solver)
elif isinstance(node, ast.Expr):
expr.infer(node.value, context, solver)
elif isinstance(node, ast.Import):
return _infer_import(node, context, solver)
elif isinstance(node, ast.ImportFrom):
return _infer_import_from(node, context, solver)
return solver.z3_types.none
def handleNodeDelete(self, node):
def on_conditional_branch():
"""
Return `True` if node is part of a conditional body.
"""
current = getattr(node, 'parent', None)
while current:
if isinstance(current, (ast.If, ast.While, ast.IfExp)):
return True
current = getattr(current, 'parent', None)
return False
name = getNodeName(node)
if not name:
return
if on_conditional_branch():
# We cannot predict if this conditional branch is going to
# be executed.
return
if isinstance(self.scope, FunctionScope) and name in self.scope.globals:
self.scope.globals.remove(name)
else:
try:
del self.scope[name]
except KeyError:
self.report(messages.UndefinedName, node, name)
def get_markers_from_body_node(self, node):
if isinstance(node, (ast.If, ast.While)):
return {node.marker} | self.get_included_markers(node.test)
elif isinstance(node, ast.For):
return {node.marker} | self.get_included_markers(node.target) | self.get_included_markers(node.iter)
elif isinstance(node, (ast.FunctionDef, ast.ClassDef)):
return self.get_included_markers(node, without=node.body)
else:
return {node.marker}
def test_while(self):
self.stmt(ast.While(ast.Num(3), [], []), "empty body on While")
self.stmt(ast.While(ast.Name("x", ast.Store()), [ast.Pass()], []),
"must have Load context")
self.stmt(ast.While(ast.Num(3), [ast.Pass()],
[ast.Expr(ast.Name("x", ast.Store()))]),
"must have Load context")
def handleNodeDelete(self, node):
def on_conditional_branch():
"""
Return `True` if node is part of a conditional body.
"""
current = getattr(node, 'parent', None)
while current:
if isinstance(current, (ast.If, ast.While, ast.IfExp)):
return True
current = getattr(current, 'parent', None)
return False
name = getNodeName(node)
if not name:
return
if on_conditional_branch():
# We can not predict if this conditional branch is going to
# be executed.
return
if isinstance(self.scope, FunctionScope) and name in self.scope.globals:
self.scope.globals.remove(name)
else:
try:
del self.scope[name]
except KeyError:
self.report(messages.UndefinedName, node, name)
def _wrap_in_loop(gen_sym, body_nodes, return_name):
new_bindings = dict()
return_flag, gen_sym = gen_sym('return_flag')
# Adding an explicit return at the end of the function, if it's not present.
if type(body_nodes[-1]) != ast.Return:
body_nodes = body_nodes + [ast.Return(value=NONE_NODE)]
inlined_code, returns_ctr, returns_in_loops = _replace_returns(
body_nodes, return_name, return_flag)
if returns_ctr == 1:
# A shortcut for a common case with a single return at the end of the function.
# No loop is required.
inlined_body = inlined_code[:-1]
else:
# Multiple returns - wrap in a `while` loop.
if returns_in_loops:
# `return_flag` value will be used to detect returns from nested loops
inlined_body = [
ast.Assign(
targets=[ast.Name(return_flag, ast.Store())],
value=FALSE_NODE)]
else:
inlined_body = []
inlined_body.append(
ast.While(
test=TRUE_NODE,
body=inlined_code,
orelse=[]))
return gen_sym, inlined_body, new_bindings