def check_indentation(self, **kwargs):
"""Inspect the code for indentation size errors."""
try:
indentation_size = kwargs['indentation_size']
except KeyError:
# Use the default value instead:
indentation_size = self.DEFAULT_RULES['indentation_size']
# Traverse the nodes and find those that are nested
# (have 'body' attribute).
nodes = [node for node in ast.walk(self.parsed_code.body[0])
if hasattr(node, 'body')]
# Use the previous line offset
# as a guide for the next line indentation.
last_offset = 0
for node in nodes:
line_number = node.body[0].lineno
col_offset = node.body[0].col_offset
if col_offset > last_offset + indentation_size:
offset = col_offset - last_offset
self.issues[line_number].add(
self.code_errors.indentation(offset, indentation_size)
)
last_offset = col_offset
python类walk()的实例源码
def check_methods_per_class(self, **kwargs):
"""
Inspect the code for too many methods per
class.
"""
try:
methods_per_class = kwargs['methods_per_class']
except KeyError:
return
klass = self.parsed_code.body[0]
if not isinstance(klass, ast.ClassDef):
return
methods = [(node, node.lineno) for node in ast.walk(klass)
if isinstance(node, ast.FunctionDef)]
try:
# Get the last method of the class
# and its line number:
line_number = methods[-1][1]
self.issues[line_number].add(
self.code_errors.too_many_methods_per_class(
len(methods), methods_per_class
)
)
except IndexError:
return
def walk(self, prog_ast):
result = list(ast.walk(prog_ast))
import_nodes = [node for node in result if isinstance(node, ast.Import)]
import_from_nodes = [node for node in result if isinstance(node, ast.ImportFrom)]
for node in import_nodes:
for name in node.names:
if ImportHandler.is_builtin(name.name):
new_ast = ImportHandler.get_builtin_ast(name.name)
else:
new_ast = ImportHandler.get_module_ast(name.name, self.base_folder)
result += self.walk(new_ast)
for node in import_from_nodes:
if node.module == "typing":
# FIXME ignore typing for now, not to break type vars
continue
if ImportHandler.is_builtin(node.module):
new_ast = ImportHandler.get_builtin_ast(node.module)
else:
new_ast = ImportHandler.get_module_ast(node.module, self.base_folder)
result += self.walk(new_ast)
return result
def ingest(self, rootdir):
"""
Collect all the .py files to perform analysis upon
"""
if not os.path.isdir(rootdir):
raise Exception("directory %s passed in is not a dir" % rootdir)
self.__target_dir = rootdir
# walk the dirs/files
for root, subdir, files in os.walk(self.__target_dir):
for f in files:
if f.endswith(".py"):
fullpath = root + os.sep + f
contents = file(fullpath).read()
tree = ast.parse(contents)
self.__fn_to_ast[fullpath] = tree
# potentially analyze .html files for jinja templates
if self.perform_jinja_analysis:
self.__template_dir = self.get_template_dir()
def find_template_dir(self):
# web-p2 is web-p2/partners/templates
# login is login/templates
# TODO: look for invocations of `jinja2.Environment` and see if
# we can pull the template directory / package from there? Should work
# for most.
template_dirs = set()
for root, subdir, files in os.walk(self.__target_dir):
for fname in files:
fpath = os.path.join(root, fname)
if fname.endswith(".html"):
with open(fpath, "rb") as f:
# Hmm, smells like a jinja template!
if b"{%" in f.read():
template_dirs.add(root)
# If there are multiple template directories in a repo we might need
# repo-specific overrides.
return None if not template_dirs else os.path.commonprefix(template_dirs)
def get_template_dir(self):
""" return the directory containing jinja2 templates
ex: web-p2 is web-p2/partners/templates
"""
template_dirs = set()
for root, subdir, files in os.walk(self.__target_dir):
for fname in files:
fpath = os.path.join(root, fname)
if fname.endswith(".html"):
with open(fpath, "rb") as f:
# Hmm, smells like a jinja template!
if b"{%" in f.read():
template_dirs.add(root)
# If there are multiple template directories in a repo we might need
# repo-specific overrides.
return None if not template_dirs else os.path.commonprefix(template_dirs)
def file_contains_pluggable(file_path, pluggable):
plugin_class = None
try:
with open(file_path, "r") as f:
syntax_tree = ast.parse(f.read())
except FileNotFoundError:
return [False, None]
for statement in ast.walk(syntax_tree):
if isinstance(statement, ast.ClassDef):
class_name = statement.name
bases = list(map(lambda b: b.id if isinstance(b, ast.Name) else b.attr, statement.bases))
if pluggable in bases:
plugin_class = class_name
return [plugin_class is not None, plugin_class]
def _walk_files(self, path):
"""Walk paths and yield Python paths
Directories and files are yielded in alphabetical order. Directories
starting with a "." are skipped. As are those that match any provided
ignore patterns.
"""
if os.path.isfile(path):
yield path
elif not os.path.isdir(path):
LOG.error("The path '%s' can't be found.", path)
raise StopIteration
for root, dirs, filenames in os.walk(path):
# Remove dot-directories from the dirs list.
dirs[:] = sorted(d for d in dirs if not d.startswith('.') and
not self._is_ignored(d))
for filename in sorted(filenames):
if self._is_python(filename):
yield os.path.join(root, filename)
def _walk_ast(self, node, top=False):
if not hasattr(node, 'parent'):
node.parent = None
node.parents = []
for field, value in ast.iter_fields(node):
if isinstance(value, list):
for index, item in enumerate(value):
if isinstance(item, ast.AST):
self._walk_ast(item)
self._set_parnt_fields(item, node, field, index)
elif isinstance(value, ast.AST):
self._walk_ast(value)
self._set_parnt_fields(value, node, field)
if top:
return ast.walk(node)
def runTest(self):
"""Makes a simple test of the output"""
body = ast.parse(self.candidate_code, self.file_name, 'exec')
code = compile(self.candidate_code, self.file_name, 'exec')
format_nodes = [
node
for node in ast.walk(body)
if isinstance(node, ast.Attribute) and
node.attr == 'format' and
isinstance(node.value, ast.Str) and
'{}' in node.value.s
]
self.assertGreater(
len(format_nodes),
0,
"It should have at one format call with curly braces {}"
)
exec(code)
self.assertMultiLineEqual('Talk is cheap. Show me the code.\n',
self.__mockstdout.getvalue(),
'Output is not correct')
def runTest(self):
"""Makes a simple test of the output"""
body = ast.parse(self.candidate_code, self.file_name, 'exec')
code = compile(self.candidate_code, self.file_name, 'exec')
mult_instructions = [
node for node in ast.walk(body)
if isinstance(node, ast.Mult)
]
self.assertGreater(len(mult_instructions),
0,
"It should have at least one duplication"
)
exec(code)
self.assertMultiLineEqual('ka'*10+'\n',
self.__mockstdout.getvalue(),
"Should have printed ka 10 times")
def runTest(self):
"""Makes a simple test of the output"""
body = ast.parse(self.candidate_code, self.file_name, 'exec')
code = compile(self.candidate_code, self.file_name, 'exec', optimize=0)
exec(code)
if_statements = [
node
for node in ast.walk(body)
if isinstance(node, ast.If)
]
self.assertGreater(len(if_statements),
0,
"Should have at least on if statement")
self.assertMultiLineEqual(self.correct_output,
self.__mockstdout.getvalue(),
"Output should be correct")
def get_statement_startend2(lineno, node):
import ast
# flatten all statements and except handlers into one lineno-list
# AST's line numbers start indexing at 1
l = []
for x in ast.walk(node):
if isinstance(x, _ast.stmt) or isinstance(x, _ast.ExceptHandler):
l.append(x.lineno - 1)
for name in "finalbody", "orelse":
val = getattr(x, name, None)
if val:
# treat the finally/orelse part as its own statement
l.append(val[0].lineno - 1 - 1)
l.sort()
insert_index = bisect_right(l, lineno)
start = l[insert_index - 1]
if insert_index >= len(l):
end = None
else:
end = l[insert_index]
return start, end
def get_statement_startend2(lineno, node):
import ast
# flatten all statements and except handlers into one lineno-list
# AST's line numbers start indexing at 1
l = []
for x in ast.walk(node):
if isinstance(x, _ast.stmt) or isinstance(x, _ast.ExceptHandler):
l.append(x.lineno - 1)
for name in "finalbody", "orelse":
val = getattr(x, name, None)
if val:
# treat the finally/orelse part as its own statement
l.append(val[0].lineno - 1 - 1)
l.sort()
insert_index = bisect_right(l, lineno)
start = l[insert_index - 1]
if insert_index >= len(l):
end = None
else:
end = l[insert_index]
return start, end
def get_statement_startend2(lineno, node):
import ast
# flatten all statements and except handlers into one lineno-list
# AST's line numbers start indexing at 1
l = []
for x in ast.walk(node):
if isinstance(x, _ast.stmt) or isinstance(x, _ast.ExceptHandler):
l.append(x.lineno - 1)
for name in "finalbody", "orelse":
val = getattr(x, name, None)
if val:
# treat the finally/orelse part as its own statement
l.append(val[0].lineno - 1 - 1)
l.sort()
insert_index = bisect_right(l, lineno)
start = l[insert_index - 1]
if insert_index >= len(l):
end = None
else:
end = l[insert_index]
return start, end
def get_init(self, filename="__init__.py"):
""" Get various info from the package without importing them
"""
import ast
with open(filename) as init_file:
module = ast.parse(init_file.read())
itr = lambda x: (ast.literal_eval(node.value) for node in ast.walk(module) \
if isinstance(node, ast.Assign) and node.targets[0].id == x)
try:
return next(itr("__author__")), \
next(itr("__email__")), \
next(itr("__license__")), \
next(itr("__version__"))
except StopIteration:
raise ValueError("One of author, email, license, or version"
" cannot be found in {}".format(filename))
def get_instance_variables(node, bound_name_classifier=BOUND_METHOD_ARGUMENT_NAME):
"""
Return instance variables used in an AST node
"""
node_attributes = [
child
for child in ast.walk(node)
if isinstance(child, ast.Attribute) and
get_attribute_name_id(child) == bound_name_classifier
]
node_function_call_names = [
get_object_name(child)
for child in ast.walk(node)
if isinstance(child, ast.Call)
]
node_instance_variables = [
attribute
for attribute in node_attributes
if get_object_name(attribute) not in node_function_call_names
]
return node_instance_variables
def __init__(self, expr):
self.expr = expr
self.ns = {}
try:
tree = ast.parse(expr)
except SyntaxError as exc:
raise argparse.ArgumentTypeError('Invalid service spec %r. Parse error:\n'
' %s %s^\n'
'%s' % (expr, exc.text, ' '*exc.offset, exc))
for node in ast.walk(tree):
if isinstance(node, ast.Name):
if not hasattr(builtins, node.id):
try:
__import__(node.id)
except ImportError as exc:
raise argparse.ArgumentTypeError('Invalid service spec %r. Import error: %s' % (expr, exc))
self.ns[node.id] = sys.modules[node.id]
def childHasTag(a, tag):
""" Includes the AST itself"""
if hasattr(a, tag):
return True
if type(a) == list:
for child in a:
if childHasTag(child, tag):
return True
return False
elif not isinstance(a, ast.AST):
return False
for node in ast.walk(a):
if hasattr(node, tag):
return True
return False
def hasMultiComp(a):
if not isinstance(a, ast.AST):
return False
for node in ast.walk(a):
if hasattr(node, "multiComp") and node.multiComp:
return True
return False