def createNameMap(a, d=None):
if d == None:
d = { }
if not isinstance(a, ast.AST):
return d
if type(a) == ast.Module: # Need to go through the functions backwards to make this right
for i in range(len(a.body) - 1, -1, -1):
createNameMap(a.body[i], d)
return d
if type(a) in [ast.FunctionDef, ast.ClassDef]:
if hasattr(a, "originalId") and a.name not in d:
d[a.name] = a.originalId
elif type(a) == ast.arg:
if hasattr(a, "originalId") and a.arg not in d:
d[a.arg] = a.originalId
return d
elif type(a) == ast.Name:
if hasattr(a, "originalId") and a.id not in d:
d[a.id] = a.originalId
return d
for child in ast.iter_child_nodes(a):
createNameMap(child, d)
return d
python类iter_child_nodes()的实例源码
def findId(a, id):
if hasattr(a, "global_id") and a.global_id == id:
return a
if type(a) == list:
for child in a:
tmp = findId(child, id)
if tmp != None:
return tmp
return None
if not isinstance(a, ast.AST):
return None
for child in ast.iter_child_nodes(a):
tmp = findId(child, id)
if tmp != None:
return tmp
return None
def findHelperFunction(a, helperId, helperCount):
"""Finds the first helper function used in the ast"""
if not isinstance(a, ast.AST):
return None
# Check all the children, so that we don't end up with a recursive problem
for child in ast.iter_child_nodes(a):
f = findHelperFunction(child, helperId, helperCount)
if f != None:
return f
# Then check if this is the right call
if type(a) == ast.Call:
if type(a.func) == ast.Name and a.func.id == helperId:
if helperCount[0] > 0:
helperCount[0] -= 1
else:
return a
return None
def occursIn(sub, super):
"""Does the first AST occur as a subtree of the second?"""
superStatementTypes = [ ast.Module, ast.Interactive, ast.Suite,
ast.FunctionDef, ast.ClassDef, ast.For,
ast.While, ast.If, ast.With, ast.Try,
ast.ExceptHandler ]
if (not isinstance(super, ast.AST)):
return False
if type(sub) == type(super) and compareASTs(sub, super, checkEquality=True) == 0:
return True
# we know that a statement can never occur in an expression
# (or in a non-statement-holding statement), so cut the search off now to save time.
if isStatement(sub) and type(super) not in superStatementTypes:
return False
for child in ast.iter_child_nodes(super):
if occursIn(sub, child):
return True
return False
def run(self):
tree = self.tree
if self.filename == 'stdin':
lines = stdin_utils.stdin_get_value()
tree = ast.parse(lines)
for statement in ast.walk(tree):
for child in ast.iter_child_nodes(statement):
child.__flake8_builtins_parent = statement
for statement in ast.walk(tree):
value = None
if isinstance(statement, ast.Assign):
value = self.check_assignment(statement)
elif isinstance(statement, ast.FunctionDef):
value = self.check_function_definition(statement)
if value:
for line, offset, msg, rtype in value:
yield line, offset, msg, rtype
def get_version():
module_path = os.path.join(os.path.dirname(__file__), 'logging_spinner.py')
module_file = open(module_path)
try:
module_code = module_file.read()
finally:
module_file.close()
tree = ast.parse(module_code, module_path)
for node in ast.iter_child_nodes(tree):
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target, = node.targets
if isinstance(target, ast.Name) and target.id == '__version__':
value = node.value
if isinstance(value, ast.Str):
return value.s
raise ValueError('__version__ is not defined as a string literal')
raise ValueError('could not find __version__')
def get_version():
module_path = os.path.join(os.path.dirname(__file__),
'wikidata', '__init__.py')
module_file = open(module_path)
try:
module_code = module_file.read()
finally:
module_file.close()
tree = ast.parse(module_code, module_path)
for node in ast.iter_child_nodes(tree):
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target, = node.targets
if isinstance(target, ast.Name) and target.id == '__version__':
value = node.value
if isinstance(value, ast.Str):
return value.s
raise ValueError('__version__ is not defined as a string literal')
raise ValueError('could not find __version__')
def _add_section(self, node):
"""
Register the current node as a new context block
"""
self._filldown(node.lineno)
# push a new context onto stack
self.context.append(node.name)
self._update_current_context()
for _ in map(self.visit, iter_child_nodes(node)):
pass
# restore current context
self.context.pop()
self._update_current_context()
def extract_tags_from_ast(node):
for child in ast.iter_child_nodes(node):
for t in extract_tags_from_ast(child):
yield t
if (
isinstance(node, ast.Call) and
isinstance(node.func, ast.Name) and
node.func.id == 'tag' and
len(node.args) >= 3 and
isinstance(node.args[2], ast.Str)
):
yield node.args[2].s
if (
isinstance(node, ast.Call) and
isinstance(node.func, ast.Attribute) and
isinstance(node.func.value, ast.Name) and
node.func.value.id in ('self', 'owner') and
node.func.attr == 'tag' and
len(node.args) >= 2 and
isinstance(node.args[1], ast.Str)
):
yield node.args[1].s
def check_for_b901(self, node):
xs = list(node.body)
has_yield = False
return_node = None
while xs:
x = xs.pop()
if isinstance(x, (ast.AsyncFunctionDef, ast.FunctionDef)):
continue
elif isinstance(x, (ast.Yield, ast.YieldFrom)):
has_yield = True
elif isinstance(x, ast.Return) and x.value is not None:
return_node = x
if has_yield and return_node is not None:
self.errors.append(
B901(return_node.lineno, return_node.col_offset)
)
break
xs.extend(ast.iter_child_nodes(x))
def get_version():
filename = os.path.join(os.path.dirname(__file__),
'publicdns', '__init__.py')
version = None
with open(filename, 'r') as f:
tree = ast.parse(f.read(), filename)
for node in ast.iter_child_nodes(tree):
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target, = node.targets
if (isinstance(target, ast.Name) and
target.id == '__version_info__'):
version = ast.literal_eval(node.value)
return '.'.join([str(x) for x in version])
def set_location(node, lineno, col_offset):
"""Set node location information recursively."""
def _fix(node, lineno, col_offset):
if "lineno" in node._attributes:
node.lineno = lineno
if "col_offset" in node._attributes:
node.col_offset = col_offset
for child in ast.iter_child_nodes(node):
_fix(child, lineno, col_offset)
_fix(node, lineno, col_offset)
return node
def getLineNumberFromAst(a):
if not isinstance(a, ast.AST):
return None
if hasattr(a, "lineno"):
return a.lineno
line = None
for child in ast.iter_child_nodes(a):
line = getLineNumberFromAst(child)
if line != None:
return line
return line
def updateVariableNames(a, varMap, scopeName, randomCounter, imports):
if not isinstance(a, ast.AST):
return
if type(a) in [ast.FunctionDef, ast.ClassDef]:
if a.name in varMap:
if not hasattr(a, "originalId"):
a.originalId = a.name
a.name = varMap[a.name]
anonymizeStatementNames(a, varMap, "_" + a.name, imports)
elif type(a) == ast.arg:
if a.arg not in varMap and not (builtInName(a.arg) or importedName(a.arg, imports)):
log("Can't assign to arg?", "bug")
if a.arg in varMap:
if not hasattr(a, "originalId"):
a.originalId = a.arg
if varMap[a.arg][0] == "r":
a.randomVar = True # so we know it can crash
if a.arg == varMap[a.arg]:
# Check whether this is a given name
if not isAnonVariable(varMap[a.arg]):
a.dontChangeName = True
a.arg = varMap[a.arg]
elif type(a) == ast.Name:
if a.id not in varMap and not (builtInName(a.id) or importedName(a.id, imports)):
varMap[a.id] = "r" + str(randomCounter[0]) + scopeName
randomCounter[0] += 1
if a.id in varMap:
if not hasattr(a, "originalId"):
a.originalId = a.id
if varMap[a.id][0] == "r":
a.randomVar = True # so we know it can crash
if a.id == varMap[a.id]:
# Check whether this is a given name
if not isAnonVariable(varMap[a.id]):
a.dontChangeName = True
a.id = varMap[a.id]
else:
for child in ast.iter_child_nodes(a):
updateVariableNames(child, varMap, scopeName, randomCounter, imports)
def individualizeVariables(a, variablePairs, idNum, imports):
"""Replace variable names with new individualized ones (for inlining methods)"""
if not isinstance(a, ast.AST):
return
if type(a) == ast.Name:
if a.id not in variablePairs and not (builtInName(a.id) or importedName(a.id, imports)):
name = "_var_" + a.id + "_" + str(idNum[0])
variablePairs[a.id] = name
if a.id in variablePairs:
a.id = variablePairs[a.id]
# Override built-in names when they're assigned to.
elif type(a) == ast.Assign and type(a.targets[0]) == ast.Name:
if a.targets[0].id not in variablePairs:
name = "_var_" + a.targets[0].id + "_" + str(idNum[0])
variablePairs[a.targets[0].id] = name
elif type(a) == ast.arguments:
for arg in a.args:
if type(arg) == ast.arg:
name = "_arg_" + arg.arg + "_" + str(idNum[0])
variablePairs[arg.arg] = name
arg.arg = variablePairs[arg.arg]
return
elif type(a) == ast.Call:
if type(a.func) == ast.Name:
variablePairs[a.func.id] = a.func.id # save the function name!
for child in ast.iter_child_nodes(a):
individualizeVariables(child, variablePairs, idNum, imports)
def allVariablesUsed(a):
if not isinstance(a, ast.AST):
return []
elif type(a) == ast.Name:
return [a]
variables = []
for child in ast.iter_child_nodes(a):
variables += allVariablesUsed(child)
return variables
def allVariableNamesUsed(a):
"""Gathers all the variable names used in the ast"""
if not isinstance(a, ast.AST):
return []
elif type(a) == ast.Name:
return [a.id]
elif type(a) == ast.Assign:
"""In assignments, ignore all pure names used- they're being assigned to, not used"""
variables = allVariableNamesUsed(a.value)
for target in a.targets:
if type(target) == ast.Name:
pass
elif type(target) in [ast.Tuple, ast.List]:
for elt in target.elts:
if type(elt) != ast.Name:
variables += allVariableNamesUsed(elt)
else:
variables += allVariableNamesUsed(target)
return variables
elif type(a) == ast.AugAssign:
variables = allVariableNamesUsed(a.value)
variables += allVariableNamesUsed(a.target)
return variables
variables = []
for child in ast.iter_child_nodes(a):
variables += allVariableNamesUsed(child)
return variables
def allVariablesUsed(a):
"""Gathers all the variable names used in the ast"""
if type(a) == list:
variables = []
for x in a:
variables += allVariablesUsed(x)
return variables
if not isinstance(a, ast.AST):
return []
elif type(a) == ast.Name:
return [a]
elif type(a) == ast.Assign:
variables = allVariablesUsed(a.value)
for target in a.targets:
if type(target) == ast.Name:
pass
elif type(target) in [ast.Tuple, ast.List]:
for elt in target.elts:
if type(elt) == ast.Name:
pass
else:
variables += allVariablesUsed(elt)
else:
variables += allVariablesUsed(target)
return variables
else:
variables = []
for child in ast.iter_child_nodes(a):
variables += allVariablesUsed(child)
return variables
def depthOfAST(a):
"""Determine the depth of the AST"""
if not isinstance(a, ast.AST):
return 0
m = 0
for child in ast.iter_child_nodes(a):
tmp = depthOfAST(child)
if tmp > m:
m = tmp
return m + 1
def class_check(node, kw=False):
'''
Class specific check
Action - check private method, move to utils
check docstring
scan for class - recursive class check
scan for function - function check
'''
status = True
for child in ast.iter_child_nodes(node):
if isinstance(child, ast.FunctionDef):
if kw and child.name.startswith("_") and child.name != "__init__":
print node.name, child.name, "should move to utils"
status = False
tmp_status = func_check(child, kw)
status &= tmp_status
elif isinstance(child, ast.ClassDef):
tmp_status = class_check(child, kw)
status &= tmp_status
if ast.get_docstring(node) is None:
# check for docstring
print node.name, "doesn't contain any docstring"
status = False
return status
def iter_children_ast(node):
# Don't attempt to process children of JoinedStr nodes, which we can't fully handle yet.
if is_joined_str(node):
return
for child in ast.iter_child_nodes(node):
# Skip singleton children; they don't reflect particular positions in the code and break the
# assumptions about the tree consisting of distinct nodes. Note that collecting classes
# beforehand and checking them in a set is faster than using isinstance each time.
if child.__class__ not in SINGLETONS:
yield child
def test_iter_child_nodes(self):
node = ast.parse("spam(23, 42, eggs='leek')", mode='eval')
self.assertEqual(len(list(ast.iter_child_nodes(node.body))), 4)
iterator = ast.iter_child_nodes(node.body)
self.assertEqual(next(iterator).id, 'spam')
self.assertEqual(next(iterator).n, 23)
self.assertEqual(next(iterator).n, 42)
self.assertEqual(ast.dump(next(iterator)),
"keyword(arg='eggs', value=Str(s='leek'))"
)
def test_iter_child_nodes(self):
node = ast.parse("spam(23, 42, eggs='leek')", mode='eval')
self.assertEqual(len(list(ast.iter_child_nodes(node.body))), 4)
iterator = ast.iter_child_nodes(node.body)
self.assertEqual(next(iterator).id, 'spam')
self.assertEqual(next(iterator).n, 23)
self.assertEqual(next(iterator).n, 42)
self.assertEqual(ast.dump(next(iterator)),
"keyword(arg='eggs', value=Str(s='leek'))"
)
def test_iter_child_nodes(self):
node = ast.parse("spam(23, 42, eggs='leek')", mode='eval')
self.assertEqual(len(list(ast.iter_child_nodes(node.body))), 4)
iterator = ast.iter_child_nodes(node.body)
self.assertEqual(next(iterator).id, 'spam')
self.assertEqual(next(iterator).n, 23)
self.assertEqual(next(iterator).n, 42)
self.assertEqual(ast.dump(next(iterator)),
"keyword(arg='eggs', value=Str(s='leek'))"
)
def set_location(node, lineno, col_offset):
"""Set node location information recursively."""
def _fix(node, lineno, col_offset):
if "lineno" in node._attributes:
node.lineno = lineno
if "col_offset" in node._attributes:
node.col_offset = col_offset
for child in ast.iter_child_nodes(node):
_fix(child, lineno, col_offset)
_fix(node, lineno, col_offset)
return node
def find_topology_in_python(filename):
"""
Find the TOPOLOGY variable inside a Python file.
This helper functions build a AST tree a grabs the variable from it. Thus,
the Python code isn't executed.
:param str filename: Path to file to search for TOPOLOGY.
:rtype: str or None
:return: The value of the TOPOLOGY variable if found, None otherwise.
"""
import ast
try:
with open(filename) as fd:
tree = ast.parse(fd.read())
for node in ast.iter_child_nodes(tree):
if not isinstance(node, ast.Assign):
continue
if node.targets[0].id == 'TOPOLOGY':
return node.value.s
except Exception:
log.error(format_exc())
return None
def traverse_nodes(tree):
"""Return a generator that traverses all nodes of a tree."""
queue = [tree]
while queue:
current_node = queue.pop(0)
children = list(ast.iter_child_nodes(current_node))
queue.extend(children)
yield current_node
def _traverse_tree(root):
num_nodes = 1
queue = [root]
root_json = {
"node": _name(root),
"children": []
}
queue_json = [root_json]
while queue:
current_node = queue.pop(0)
num_nodes += 1
# print (_name(current_node))
current_node_json = queue_json.pop(0)
children = list(ast.iter_child_nodes(current_node))
queue.extend(children)
for child in children:
child_json = {
"node": _name(child),
"children": []
}
current_node_json['children'].append(child_json)
queue_json.append(child_json)
return root_json, num_nodes
def _create_samples(node):
"""Convert a node's children into a sample points."""
samples = []
for child in ast.iter_child_nodes(node):
sample = {
"node": _name(child),
"parent": _name(node),
"children": [_name(x) for x in ast.iter_child_nodes(child)]
}
samples.append(sample)
return samples
def _traverse_tree(tree, callback):
"""Traverse a tree and execute the callback on every node."""
queue = [tree]
while queue:
current_node = queue.pop(0)
children = list(ast.iter_child_nodes(current_node))
queue.extend(children)
callback(current_node)