def __init__(self, stmt, context):
self.stmt = stmt
self.context = context
self.stmt_table = {
ast.Expr: self.expr,
ast.Pass: self.parse_pass,
ast.AnnAssign: self.ann_assign,
ast.Assign: self.assign,
ast.If: self.parse_if,
ast.Call: self.call,
ast.Assert: self.parse_assert,
ast.For: self.parse_for,
ast.AugAssign: self.aug_assign,
ast.Break: self.parse_break,
ast.Return: self.parse_return,
}
stmt_type = self.stmt.__class__
if stmt_type in self.stmt_table:
self.lll_node = self.stmt_table[stmt_type]()
elif isinstance(stmt, ast.Name) and stmt.id == "throw":
self.lll_node = LLLnode.from_list(['assert', 0], typ=None, pos=getpos(stmt))
else:
raise StructureException("Unsupported statement type", stmt)
python类Break()的实例源码
def _handle_ast_list(self, ast_list):
"""
Find unreachable nodes in the given sequence of ast nodes.
"""
for index, node in enumerate(ast_list):
if isinstance(node, (ast.Break, ast.Continue, ast.Raise,
ast.Return)):
try:
first_unreachable_node = ast_list[index + 1]
except IndexError:
continue
class_name = node.__class__.__name__.lower()
self._define(
self.unreachable_code,
class_name,
first_unreachable_node,
last_node=ast_list[-1],
message="unreachable code after '{class_name}'".format(
**locals()),
confidence=100)
return
def CONTINUE(self, node):
# Walk the tree up until we see a loop (OK), a function or class
# definition (not OK), for 'continue', a finally block (not OK), or
# the top module scope (not OK)
n = node
while hasattr(n, 'parent'):
n, n_child = n.parent, n
if isinstance(n, LOOP_TYPES):
# Doesn't apply unless it's in the loop itself
if n_child not in n.orelse:
return
if isinstance(n, (ast.FunctionDef, ast.ClassDef)):
break
# Handle Try/TryFinally difference in Python < and >= 3.3
if hasattr(n, 'finalbody') and isinstance(node, ast.Continue):
if n_child in n.finalbody:
self.report(messages.ContinueInFinally, node)
return
if isinstance(node, ast.Continue):
self.report(messages.ContinueOutsideLoop, node)
else: # ast.Break
self.report(messages.BreakOutsideLoop, node)
def CONTINUE(self, node):
# Walk the tree up until we see a loop (OK), a function or class
# definition (not OK), for 'continue', a finally block (not OK), or
# the top module scope (not OK)
n = node
while hasattr(n, 'parent'):
n, n_child = n.parent, n
if isinstance(n, LOOP_TYPES):
# Doesn't apply unless it's in the loop itself
if n_child not in n.orelse:
return
if isinstance(n, (ast.FunctionDef, ast.ClassDef)):
break
# Handle Try/TryFinally difference in Python < and >= 3.3
if hasattr(n, 'finalbody') and isinstance(node, ast.Continue):
if n_child in n.finalbody:
self.report(messages.ContinueInFinally, node)
return
if isinstance(node, ast.Continue):
self.report(messages.ContinueOutsideLoop, node)
else: # ast.Break
self.report(messages.BreakOutsideLoop, node)
def get_coverable_nodes(cls):
return {
ast.Assert,
ast.Assign,
ast.AugAssign,
ast.Break,
ast.Continue,
ast.Delete,
ast.Expr,
ast.Global,
ast.Import,
ast.ImportFrom,
ast.Nonlocal,
ast.Pass,
ast.Raise,
ast.Return,
ast.FunctionDef,
ast.ClassDef,
ast.TryExcept,
ast.TryFinally,
ast.ExceptHandler,
ast.If,
ast.For,
ast.While,
}
def get_coverable_nodes(cls):
return {
ast.Assert,
ast.Assign,
ast.AugAssign,
ast.Break,
ast.Continue,
ast.Delete,
ast.Expr,
ast.Global,
ast.Import,
ast.ImportFrom,
ast.Nonlocal,
ast.Pass,
ast.Raise,
ast.Return,
ast.ClassDef,
ast.FunctionDef,
ast.Try,
ast.ExceptHandler,
ast.If,
ast.For,
ast.While,
}
def _build_node_cfg(node):
handlers = {
ast.If: _build_if_cfg,
ast.For: _build_loop_cfg,
ast.While: _build_loop_cfg,
ast.With: _build_with_cfg,
ast.Break: _build_break_cfg,
ast.Continue: _build_continue_cfg,
ast.Return: _build_return_cfg,
ast.Try: _build_try_cfg,
}
if type(node) in handlers:
handler = handlers[type(node)]
else:
handler = _build_statement_cfg
return handler(node)
def handle_Return(state, node, ctx, **_):
state_update = dict(returns_ctr=state.returns_ctr + 1)
new_nodes = [
ast.Assign(
targets=[ast.Name(id=ctx.return_var, ctx=ast.Store())],
value=node.value)]
if state.loop_nesting_ctr > 0:
new_nodes.append(
ast.Assign(
targets=[ast.Name(id=ctx.return_flag_var, ctx=ast.Store())],
value=TRUE_NODE))
state_update.update(return_inside_a_loop=True, returns_in_loops=True)
new_nodes.append(ast.Break())
return state.update(state_update), new_nodes
def _translate_body(self, body, types, allow_loose_in_edges=False, allow_loose_out_edges=False):
cfg_factory = CFGFactory(self._id_gen)
for child in body:
if isinstance(child, (ast.AnnAssign, ast.Expr)):
cfg_factory.add_stmts(self.visit(child, types))
elif isinstance(child, ast.If):
cfg_factory.complete_basic_block()
if_cfg = self.visit(child, types)
cfg_factory.append_cfg(if_cfg)
elif isinstance(child, ast.While):
cfg_factory.complete_basic_block()
while_cfg = self.visit(child, types)
cfg_factory.append_cfg(while_cfg)
elif isinstance(child, ast.Break):
cfg_factory.complete_basic_block()
break_cfg = self.visit(child, types)
cfg_factory.append_cfg(break_cfg)
elif isinstance(child, ast.Continue):
cfg_factory.complete_basic_block()
cont_cfg = self.visit(child, types)
cfg_factory.append_cfg(cont_cfg)
elif isinstance(child, ast.Pass):
if cfg_factory.incomplete_block():
pass
else:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
else:
raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!")
cfg_factory.complete_basic_block()
if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges:
cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen))
if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
return cfg_factory.cfg
def isStatement(a):
"""Determine whether the given node is a statement (vs an expression)"""
return type(a) in [ ast.Module, ast.Interactive, ast.Expression, ast.Suite,
ast.FunctionDef, ast.ClassDef, ast.Return, ast.Delete,
ast.Assign, ast.AugAssign, ast.For, ast.While,
ast.If, ast.With, ast.Raise, ast.Try,
ast.Assert, ast.Import, ast.ImportFrom, ast.Global,
ast.Expr, ast.Pass, ast.Break, ast.Continue ]
def _translate_body(self, body, allow_loose_in_edges=False, allow_loose_out_edges=False):
cfg_factory = CfgFactory(self._id_gen)
for child in body:
if isinstance(child, (ast.Assign, ast.AugAssign, ast.Expr)):
cfg_factory.add_stmts(self.visit(child))
elif isinstance(child, ast.If):
cfg_factory.complete_basic_block()
if_cfg = self.visit(child)
cfg_factory.append_cfg(if_cfg)
elif isinstance(child, ast.While):
cfg_factory.complete_basic_block()
while_cfg = self.visit(child)
cfg_factory.append_cfg(while_cfg)
elif isinstance(child, ast.Break):
cfg_factory.complete_basic_block()
break_cfg = self.visit(child)
cfg_factory.append_cfg(break_cfg)
elif isinstance(child, ast.Continue):
cfg_factory.complete_basic_block()
cont_cfg = self.visit(child)
cfg_factory.append_cfg(cont_cfg)
elif isinstance(child, ast.Pass):
if cfg_factory.incomplete_block():
pass
else:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
else:
raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!")
cfg_factory.complete_basic_block()
if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges:
cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen))
if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
return cfg_factory.cfg
def mutate_Continue(self, node):
return ast.Break()
def one_iteration(self, node):
node.body.append(ast.Break())
return node
def zero_iteration(self, node):
node.body = [ast.Break()]
return node
def get_nontrivial_nodes(self):
# returns ids of nodes that can possibly raise an exception
nodes = []
for node_id, node_obj in self.nodes.items():
node = node_obj.ast_node
if type(node) not in (ast.Break, ast.Continue, ast.Pass, ast.Try):
nodes.append(node_id)
return nodes
def _build_cfg(statements):
enter = id(statements[0])
exits = [enter]
graph = Graph()
jumps = Jumps()
for i, node in enumerate(statements):
cfg = _build_node_cfg(node)
graph.update(cfg.graph)
if i > 0:
for exit_ in exits:
graph.add_edge(exit_, cfg.enter)
exits = cfg.exits
jumps = jumps.join(cfg.jumps)
if type(node) in (ast.Break, ast.Continue, ast.Return):
# Issue a warning about unreachable code?
break
return ControlFlowSubgraph(graph, enter, exits=exits, jumps=jumps)
def _handle_loop(node, state, ctx, visit_after, visiting_after, walk_field, **_):
if not visiting_after:
# Need to traverse fields explicitly since for the purposes of _replace_returns(),
# the body of `orelse` field is not inside a loop.
state = state.update(loop_nesting_ctr=state.loop_nesting_ctr + 1)
state, new_body = walk_field(state, node.body, block_context=True)
state = state.update(loop_nesting_ctr=state.loop_nesting_ctr - 1)
state, new_orelse = walk_field(state, node.orelse, block_context=True)
visit_after()
return state, replace_fields(node, body=new_body, orelse=new_orelse)
else:
# If there was a return inside a loop, append a conditional break
# to propagate the return otside all nested loops
if state.return_inside_a_loop:
new_nodes = [
node,
ast.If(
test=ast.Name(id=ctx.return_flag_var),
body=[ast.Break()],
orelse=[])]
else:
new_nodes = node
# if we are at root level, reset the return-inside-a-loop flag
if state.loop_nesting_ctr == 0:
state = state.update(return_inside_a_loop=False)
return state, new_nodes
def handle_While(node, **_):
last_node = node.body[-1]
unconditional_jump = type(last_node) in (ast.Break, ast.Raise, ast.Return)
if unconditional_jump and find_jumps(node.body) == 1:
if type(last_node) == ast.Break:
new_body = node.body[:-1]
else:
new_body = node.body
return ast.If(test=node.test, body=new_body, orelse=node.orelse)
else:
return node