def mess_control(f, *, debug=0):
assert inspect.isfunction(f)
members = dict(inspect.getmembers(f))
global_dict = members["__globals__"]
source_filename = inspect.getsourcefile(f)
f_ast = ast.parse(inspect.getsource(f))
_, starting_line = inspect.getsourcelines(f)
# ast_demo.increment_lineno(f_ast, starting_line - 1)
if debug:
print("AST:", ast.dump(f_ast))
visitor = ControlMess()
new_ast = visitor.visit(f_ast)
if debug:
print('NEW AST:', ast.dump(new_ast))
ast.fix_missing_locations(new_ast)
co = compile(new_ast, '<ast_demo>', 'exec')
fake_locals = {}
# exec will define the new function into fake_locals scope
# this is to avoid conflict with vars in the real locals()
# https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
exec(co, None, fake_locals)
# new_f = locals()[visitor._new_func_name(f.__name__)]
return fake_locals[f.__name__]
python类getsource()的实例源码
def __init__(self, *parts, **kwargs):
self.lines = lines = []
de = kwargs.get('deindent', True)
rstrip = kwargs.get('rstrip', True)
for part in parts:
if not part:
partlines = []
if isinstance(part, Source):
partlines = part.lines
elif isinstance(part, (tuple, list)):
partlines = [x.rstrip("\n") for x in part]
elif isinstance(part, py.builtin._basestring):
partlines = part.split('\n')
if rstrip:
while partlines:
if partlines[-1].strip():
break
partlines.pop()
else:
partlines = getsource(part, deindent=de).lines
if de:
partlines = deindent(partlines)
lines.extend(partlines)
def __init__(self, *parts, **kwargs):
self.lines = lines = []
de = kwargs.get('deindent', True)
rstrip = kwargs.get('rstrip', True)
for part in parts:
if not part:
partlines = []
if isinstance(part, Source):
partlines = part.lines
elif isinstance(part, (tuple, list)):
partlines = [x.rstrip("\n") for x in part]
elif isinstance(part, py.builtin._basestring):
partlines = part.split('\n')
if rstrip:
while partlines:
if partlines[-1].strip():
break
partlines.pop()
else:
partlines = getsource(part, deindent=de).lines
if de:
partlines = deindent(partlines)
lines.extend(partlines)
def get_class_traits(klass):
""" Yield all of the documentation for trait definitions on a class object.
"""
# FIXME: gracefully handle errors here or in the caller?
source = inspect.getsource(klass)
cb = CommentBlocker()
cb.process_file(StringIO(source))
mod_ast = compiler.parse(source)
class_ast = mod_ast.node.nodes[0]
for node in class_ast.code.nodes:
# FIXME: handle other kinds of assignments?
if isinstance(node, compiler.ast.Assign):
name = node.nodes[0].name
rhs = unparse(node.expr).strip()
doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
yield name, rhs, doc
def local_settings_update(self, changes):
"""Update local_settings.py with new content created according to the changes parameter.
The changes parameter should be a list generated by check_modules()"""
if not local_settings:
raise SystemError('Missing local_settings.py!')
logger.info('Creating new local_settings.py with following changes: %s', self._show_changes(changes))
target = inspect.getsourcefile(local_settings)
data = self._local_settings_new(changes)
backup = inspect.getsource(local_settings)
logger.warn('Updating %s', target)
self._save_file(target, data)
try:
reload_module(local_settings)
except ImportError as e:
logger.exception(e)
logger.warn('Restoring %s from backup', target)
self._save_file(target, backup)
else:
# Force reloading of django settings
settings._wrapped = empty
def write_version_py(filename):
# Adding the git rev number needs to be done inside write_version_py(),
# otherwise the import of numpy.version messes up the build under Python 3.
git_version_code = inspect.getsource(get_git_version)
if os.path.exists('.git'):
GIT_REVISION = get_git_version()
else:
GIT_REVISION = 'Unknown'
content = VERSION_PY_CONTENT % {
'version': PACKAGE_VERSION,
'git_revision': GIT_REVISION,
'is_release': str(IS_RELEASE),
'git_version_code': str(git_version_code)
}
with open(filename, 'w') as version_file:
version_file.write(content)
################################################################################
# Installation
################################################################################
def query(name):
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
filename = module.__file__
dataset = os.path.abspath(filename).split('/')[-2]
def wrapper(f):
lines = inspect.getsource(f).split('\n')
lines = lines[:-1] # Seems to include a trailing newline
# Hacky way to get just the function body
i = 0
while True:
if "():" in lines[i]:
break
i = i + 1
fn = lines[i:]
fn += ['FN = ' + f.__name__]
queries[dataset].append([name, '\n'.join(fn)])
return f
return wrapper
def test_proceed_with_fake_filename(self):
'''doctest monkeypatches linecache to enable inspection'''
fn, source = '<test>', 'def x(): pass\n'
getlines = linecache.getlines
def monkey(filename, module_globals=None):
if filename == fn:
return source.splitlines(keepends=True)
else:
return getlines(filename, module_globals)
linecache.getlines = monkey
try:
ns = {}
exec(compile(source, fn, 'single'), ns)
inspect.getsource(ns["x"])
finally:
linecache.getlines = getlines
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
"""Internal use only.
"""
if isinstance(gen, str):
name = gen
else:
name = gen.func_name
if name in self._xfer_funcs:
code = None
else:
# if not inspect.isgeneratorfunction(gen):
# logger.warning('"%s" is not a valid generator function', name)
# raise StopIteration([])
code = inspect.getsource(gen).lstrip()
def _run_req(task=None):
msg = {'req': 'job', 'auth': self._auth,
'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
reply = yield task.receive()
if isinstance(reply, Task):
if self.status_task:
msg = DispycosTaskInfo(reply, args, kwargs, time.time())
self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
if not request.endswith('async'):
reply = yield task.receive()
else:
reply = None
raise StopIteration(reply)
yield Task(_run_req).finish()
def _run_request(self, request, where, cpu, gen, *args, **kwargs):
"""Internal use only.
"""
if isinstance(gen, str):
name = gen
else:
name = gen.__name__
if name in self._xfer_funcs:
code = None
else:
# if not inspect.isgeneratorfunction(gen):
# logger.warning('"%s" is not a valid generator function', name)
# raise StopIteration([])
code = inspect.getsource(gen).lstrip()
def _run_req(task=None):
msg = {'req': 'job', 'auth': self._auth,
'job': _DispycosJob_(request, task, name, where, cpu, code, args, kwargs)}
if (yield self.scheduler.deliver(msg, timeout=MsgTimeout)) == 1:
reply = yield task.receive()
if isinstance(reply, Task):
if self.status_task:
msg = DispycosTaskInfo(reply, args, kwargs, time.time())
self.status_task.send(DispycosStatus(Scheduler.TaskCreated, msg))
if not request.endswith('async'):
reply = yield task.receive()
else:
reply = None
raise StopIteration(reply)
yield Task(_run_req).finish()
def __new__(cls, name, bases, dct):
slots = dct.get('__slots__', [])
orig_slots = []
for base in bases:
if hasattr(base, "__slots__"):
orig_slots += base.__slots__
if '__init__' in dct:
init = dct['__init__']
initproc = type.__new__(cls, name, bases, dct)
initproc_source = inspect.getsource(initproc)
ast = compile(initproc_source, "dont_care", 'exec', _ast.PyCF_ONLY_AST)
classdef = ast.body[0]
stmts = classdef.body
for declaration in stmts:
if isinstance(declaration, _ast.FunctionDef):
name1 = declaration.name
if name1 == '__init__': # delete this line if you do not initialize all instance variables in __init__
initbody = declaration.body
for statement in initbody:
if isinstance(statement, _ast.Assign):
for target in statement.targets:
name1 = target.attr
if name1 not in orig_slots:
slots.append(name1)
if slots:
dct['__slots__'] = slots
return type.__new__(cls, name, bases, dct)
def runtime(f):
"""Evaluates the given function each time it is called."""
# get the function's name
name = f.__name__
# and its source code, sans decorator
source = remove_decorators(getsource(f))
@wraps(f)
def wrapped(*args, **kwargs):
# execute the function's declaration
exec(source)
# since the above overwrites its name in the local
# scope we can call it here using eval
return eval("%s(*%s, **%s)" % (name, args, kwargs))
return wrapped
def get_info(obj):
try:
lines = inspect.getsource(obj).split('\n')
lines = [i for i in lines if '@' not in i] # filter out decorators
return lines[0].replace('def ', '').replace('):', ')').replace(' ', '').replace(
obj.__name__, '').replace('self, ', '').replace('self', '')
except:
return None
def get_source(func):
raw_source = inspect.getsource(func)
print(raw_source)
corrected_source = correct_indentation(raw_source)
print(corrected_source)
return corrected_source
def getsource(obj, **kwargs):
obj = py.code.getrawcode(obj)
try:
strsrc = inspect.getsource(obj)
except IndentationError:
strsrc = "\"Buggy python version consider upgrading, cannot get source\""
assert isinstance(strsrc, str)
return Source(strsrc, **kwargs)
def getstatementrange_ast(lineno, source, assertion=False, astnode=None):
if astnode is None:
content = str(source)
if sys.version_info < (2,7):
content += "\n"
try:
astnode = compile(content, "source", "exec", 1024) # 1024 for AST
except ValueError:
start, end = getstatementrange_old(lineno, source, assertion)
return None, start, end
start, end = get_statement_startend2(lineno, astnode)
# we need to correct the end:
# - ast-parsing strips comments
# - there might be empty lines
# - we might have lesser indented code blocks at the end
if end is None:
end = len(source.lines)
if end > start + 1:
# make sure we don't span differently indented code blocks
# by using the BlockFinder helper used which inspect.getsource() uses itself
block_finder = inspect.BlockFinder()
# if we start with an indented line, put blockfinder to "started" mode
block_finder.started = source.lines[start][0].isspace()
it = ((x + "\n") for x in source.lines[start:end])
try:
for tok in tokenize.generate_tokens(lambda: next(it)):
block_finder.tokeneater(*tok)
except (inspect.EndOfBlock, IndentationError):
end = block_finder.last + start
except Exception:
pass
# the end might still point to a comment or empty line, correct it
while end:
line = source.lines[end - 1].lstrip()
if line.startswith("#") or not line:
end -= 1
else:
break
return astnode, start, end
def getsource(obj, **kwargs):
import _pytest._code
obj = _pytest._code.getrawcode(obj)
try:
strsrc = inspect.getsource(obj)
except IndentationError:
strsrc = "\"Buggy python version consider upgrading, cannot get source\""
assert isinstance(strsrc, str)
return Source(strsrc, **kwargs)
def get_helper_functions(self):
def to_str(fn):
return inspect.getsource(fn) if callable(fn) else fn
for fn in self.helper_functions:
yield to_str(fn)
for fn in self.get_extra_helper_functions():
yield to_str(fn)
def get_code():
"""
returns the code for the current class
"""
return inspect.getsource(Dijkstra)
def get_code():
"""
returns the code for the current class
"""
return inspect.getsource(OneDirectionalAStar)