def _handle_ast_list(self, ast_list):
"""
Find unreachable nodes in the given sequence of ast nodes.
"""
for index, node in enumerate(ast_list):
if isinstance(node, (ast.Break, ast.Continue, ast.Raise,
ast.Return)):
try:
first_unreachable_node = ast_list[index + 1]
except IndexError:
continue
class_name = node.__class__.__name__.lower()
self._define(
self.unreachable_code,
class_name,
first_unreachable_node,
last_node=ast_list[-1],
message="unreachable code after '{class_name}'".format(
**locals()),
confidence=100)
return
python类Continue()的实例源码
def CONTINUE(self, node):
# Walk the tree up until we see a loop (OK), a function or class
# definition (not OK), for 'continue', a finally block (not OK), or
# the top module scope (not OK)
n = node
while hasattr(n, 'parent'):
n, n_child = n.parent, n
if isinstance(n, LOOP_TYPES):
# Doesn't apply unless it's in the loop itself
if n_child not in n.orelse:
return
if isinstance(n, (ast.FunctionDef, ast.ClassDef)):
break
# Handle Try/TryFinally difference in Python < and >= 3.3
if hasattr(n, 'finalbody') and isinstance(node, ast.Continue):
if n_child in n.finalbody:
self.report(messages.ContinueInFinally, node)
return
if isinstance(node, ast.Continue):
self.report(messages.ContinueOutsideLoop, node)
else: # ast.Break
self.report(messages.BreakOutsideLoop, node)
def CONTINUE(self, node):
# Walk the tree up until we see a loop (OK), a function or class
# definition (not OK), for 'continue', a finally block (not OK), or
# the top module scope (not OK)
n = node
while hasattr(n, 'parent'):
n, n_child = n.parent, n
if isinstance(n, LOOP_TYPES):
# Doesn't apply unless it's in the loop itself
if n_child not in n.orelse:
return
if isinstance(n, (ast.FunctionDef, ast.ClassDef)):
break
# Handle Try/TryFinally difference in Python < and >= 3.3
if hasattr(n, 'finalbody') and isinstance(node, ast.Continue):
if n_child in n.finalbody:
self.report(messages.ContinueInFinally, node)
return
if isinstance(node, ast.Continue):
self.report(messages.ContinueOutsideLoop, node)
else: # ast.Break
self.report(messages.BreakOutsideLoop, node)
def get_coverable_nodes(cls):
return {
ast.Assert,
ast.Assign,
ast.AugAssign,
ast.Break,
ast.Continue,
ast.Delete,
ast.Expr,
ast.Global,
ast.Import,
ast.ImportFrom,
ast.Nonlocal,
ast.Pass,
ast.Raise,
ast.Return,
ast.FunctionDef,
ast.ClassDef,
ast.TryExcept,
ast.TryFinally,
ast.ExceptHandler,
ast.If,
ast.For,
ast.While,
}
def get_coverable_nodes(cls):
return {
ast.Assert,
ast.Assign,
ast.AugAssign,
ast.Break,
ast.Continue,
ast.Delete,
ast.Expr,
ast.Global,
ast.Import,
ast.ImportFrom,
ast.Nonlocal,
ast.Pass,
ast.Raise,
ast.Return,
ast.ClassDef,
ast.FunctionDef,
ast.Try,
ast.ExceptHandler,
ast.If,
ast.For,
ast.While,
}
def _build_node_cfg(node):
handlers = {
ast.If: _build_if_cfg,
ast.For: _build_loop_cfg,
ast.While: _build_loop_cfg,
ast.With: _build_with_cfg,
ast.Break: _build_break_cfg,
ast.Continue: _build_continue_cfg,
ast.Return: _build_return_cfg,
ast.Try: _build_try_cfg,
}
if type(node) in handlers:
handler = handlers[type(node)]
else:
handler = _build_statement_cfg
return handler(node)
def filter_block(node_list):
"""
Remove no-op code (``pass``), or any code after
an unconditional jump (``return``, ``break``, ``continue``, ``raise``).
"""
if len(node_list) == 1:
return node_list
new_list = []
for node in node_list:
if type(node) == ast.Pass:
continue
new_list.append(node)
if type(node) in (ast.Return, ast.Break, ast.Continue, ast.Raise):
break
if len(new_list) == len(node_list):
return node_list
else:
return new_list
def _translate_body(self, body, types, allow_loose_in_edges=False, allow_loose_out_edges=False):
cfg_factory = CFGFactory(self._id_gen)
for child in body:
if isinstance(child, (ast.AnnAssign, ast.Expr)):
cfg_factory.add_stmts(self.visit(child, types))
elif isinstance(child, ast.If):
cfg_factory.complete_basic_block()
if_cfg = self.visit(child, types)
cfg_factory.append_cfg(if_cfg)
elif isinstance(child, ast.While):
cfg_factory.complete_basic_block()
while_cfg = self.visit(child, types)
cfg_factory.append_cfg(while_cfg)
elif isinstance(child, ast.Break):
cfg_factory.complete_basic_block()
break_cfg = self.visit(child, types)
cfg_factory.append_cfg(break_cfg)
elif isinstance(child, ast.Continue):
cfg_factory.complete_basic_block()
cont_cfg = self.visit(child, types)
cfg_factory.append_cfg(cont_cfg)
elif isinstance(child, ast.Pass):
if cfg_factory.incomplete_block():
pass
else:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
else:
raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!")
cfg_factory.complete_basic_block()
if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges:
cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen))
if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
return cfg_factory.cfg
def isStatement(a):
"""Determine whether the given node is a statement (vs an expression)"""
return type(a) in [ ast.Module, ast.Interactive, ast.Expression, ast.Suite,
ast.FunctionDef, ast.ClassDef, ast.Return, ast.Delete,
ast.Assign, ast.AugAssign, ast.For, ast.While,
ast.If, ast.With, ast.Raise, ast.Try,
ast.Assert, ast.Import, ast.ImportFrom, ast.Global,
ast.Expr, ast.Pass, ast.Break, ast.Continue ]
def try_except_continue(context, config):
node = context.node
if len(node.body) == 1:
if (not config['check_typed_exception'] and
node.type is not None and
getattr(node.type, 'id', None) != 'Exception'):
return
if isinstance(node.body[0], ast.Continue):
return bandit.Issue(
severity=bandit.LOW,
confidence=bandit.HIGH,
text=("Try, Except, Continue detected."))
def _translate_body(self, body, allow_loose_in_edges=False, allow_loose_out_edges=False):
cfg_factory = CfgFactory(self._id_gen)
for child in body:
if isinstance(child, (ast.Assign, ast.AugAssign, ast.Expr)):
cfg_factory.add_stmts(self.visit(child))
elif isinstance(child, ast.If):
cfg_factory.complete_basic_block()
if_cfg = self.visit(child)
cfg_factory.append_cfg(if_cfg)
elif isinstance(child, ast.While):
cfg_factory.complete_basic_block()
while_cfg = self.visit(child)
cfg_factory.append_cfg(while_cfg)
elif isinstance(child, ast.Break):
cfg_factory.complete_basic_block()
break_cfg = self.visit(child)
cfg_factory.append_cfg(break_cfg)
elif isinstance(child, ast.Continue):
cfg_factory.complete_basic_block()
cont_cfg = self.visit(child)
cfg_factory.append_cfg(cont_cfg)
elif isinstance(child, ast.Pass):
if cfg_factory.incomplete_block():
pass
else:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
else:
raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!")
cfg_factory.complete_basic_block()
if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges:
cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen))
if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges:
cfg_factory.append_cfg(_dummy_cfg(self._id_gen))
return cfg_factory.cfg
def mutate_Break(self, node):
return ast.Continue()
def get_nontrivial_nodes(self):
# returns ids of nodes that can possibly raise an exception
nodes = []
for node_id, node_obj in self.nodes.items():
node = node_obj.ast_node
if type(node) not in (ast.Break, ast.Continue, ast.Pass, ast.Try):
nodes.append(node_id)
return nodes
def _build_cfg(statements):
enter = id(statements[0])
exits = [enter]
graph = Graph()
jumps = Jumps()
for i, node in enumerate(statements):
cfg = _build_node_cfg(node)
graph.update(cfg.graph)
if i > 0:
for exit_ in exits:
graph.add_edge(exit_, cfg.enter)
exits = cfg.exits
jumps = jumps.join(cfg.jumps)
if type(node) in (ast.Break, ast.Continue, ast.Return):
# Issue a warning about unreachable code?
break
return ControlFlowSubgraph(graph, enter, exits=exits, jumps=jumps)