def get_action(driver, keyword):
"""get action class corresponding to the keyword in the driver
"""
drvmod = 'ProductDrivers.' + driver
drvmodobj = importlib.import_module(drvmod)
drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
main_src = inspect.getsource(main_method)
pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
pkglst = pkglstmatch.group(1).split(',')
for pkg in pkglst:
pkgobj = importlib.import_module(pkg)
pkgdir = os.path.dirname(pkgobj.__file__)
action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
for action_module_obj in action_module_objs:
for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
if keyword == func_name[0]:
return action_class[1]
return None
python类getsource()的实例源码
def combineFunctions(self, functions):
""" generates a combined formula as used for the calculation
@param functions the list of individual functions (lambda code)
@return the combined formula
@note out of scope is the simplification (like: (x+1)+1 => x+2)
"""
expression = ""
for function in reversed(functions):
match = re.match(".*\((?P<expression>lambda.*)\).*", inspect.getsource(function))
if match:
functionCode = match.group("expression")
functionCode = functionCode[functionCode.find(":")+1:].strip()
if not len(expression):
expression = functionCode
else:
expression = expression.replace("x", "("+functionCode+")")
return expression
def h_fun(fun):
"""
Hash functions
:param fun: function to hash
:type fun: function
:return: hash of the function
"""
try:
return fun.code
except AttributeError:
try:
h = inspect.getsource(fun)
except IOError:
h = "nocode"
try:
fun.code = h
except AttributeError:
pass
return h
def h_fun(fun):
"""
Hash functions
:param fun: function to hash
:type fun: function
:return: hash of the function
"""
try:
return fun.code
except AttributeError:
try:
h = inspect.getsource(fun)
except IOError:
h = "nocode"
try:
fun.code = h
except AttributeError:
pass
return h
def h_fun(fun):
"""
Hash functions
:param fun: function to hash
:type fun: function
:return: hash of the function
"""
try:
return fun.code
except AttributeError:
try:
h = inspect.getsource(fun)
except IOError:
h = "nocode"
try:
fun.code = h
except AttributeError:
pass
return h
def get_decorators(cls):
decorators = {}
def visit_FunctionDef(node):
decorators[node.name] = []
for n in node.decorator_list:
name = ''
if isinstance(n, ast.Call):
name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id
else:
name = n.attr if isinstance(n, ast.Attribute) else n.id
args = [a.s for a in n.args] if hasattr(n, 'args') else []
decorators[node.name].append((name, args))
node_iter = ast.NodeVisitor()
node_iter.visit_FunctionDef = visit_FunctionDef
_cls = cls if inspect.isclass(cls) else cls.__class__
node_iter.visit(ast.parse(inspect.getsource(_cls)))
return decorators
def __init__(self, *parts, **kwargs):
self.lines = lines = []
de = kwargs.get('deindent', True)
rstrip = kwargs.get('rstrip', True)
for part in parts:
if not part:
partlines = []
if isinstance(part, Source):
partlines = part.lines
elif isinstance(part, (tuple, list)):
partlines = [x.rstrip("\n") for x in part]
elif isinstance(part, py.builtin._basestring):
partlines = part.split('\n')
if rstrip:
while partlines:
if partlines[-1].strip():
break
partlines.pop()
else:
partlines = getsource(part, deindent=de).lines
if de:
partlines = deindent(partlines)
lines.extend(partlines)
def __init__(self, *parts, **kwargs):
self.lines = lines = []
de = kwargs.get('deindent', True)
rstrip = kwargs.get('rstrip', True)
for part in parts:
if not part:
partlines = []
if isinstance(part, Source):
partlines = part.lines
elif isinstance(part, (tuple, list)):
partlines = [x.rstrip("\n") for x in part]
elif isinstance(part, py.builtin._basestring):
partlines = part.split('\n')
if rstrip:
while partlines:
if partlines[-1].strip():
break
partlines.pop()
else:
partlines = getsource(part, deindent=de).lines
if de:
partlines = deindent(partlines)
lines.extend(partlines)
def code_to_ast(code: types.CodeType, file: str = None) -> ast.Module:
"""
Return node object for code object.
"""
if code and not isinstance(code, types.CodeType):
raise TypeError('Unexpected type: {}'.format(str(type(code))))
result = None
try:
src = inspect.getsource(code)
file = file or inspect.getfile(code)
result = source_to_ast(src, file)
except IOError:
pass
return result
def module_to_ast(module: types.ModuleType, file: str = None) -> ast.Module:
"""
Return node object for python module.
"""
if module and not isinstance(module, types.ModuleType):
raise TypeError('Unexpected type: {}'.format(str(type(module))))
result = None
try:
src = inspect.getsource(module)
file = file or inspect.getfile(module)
result = source_to_ast(src, file)
except IOError:
pass
return result
def class_to_ast(class_: type, file: str = None) -> ast.ClassDef:
"""
"""
if class_ and not isinstance(class_, type):
raise TypeError('Unexpected type: {}'.format(str(type(class_))))
result = None
try:
src = inspect.getsource(class_)
file = file or inspect.getfile(class_)
result = source_to_ast(src, file)
except IOError:
pass
return result
def get_class_traits(klass):
""" Yield all of the documentation for trait definitions on a class object.
"""
# FIXME: gracefully handle errors here or in the caller?
source = inspect.getsource(klass)
cb = CommentBlocker()
cb.process_file(StringIO(source))
mod_ast = compiler.parse(source)
class_ast = mod_ast.node.nodes[0]
for node in class_ast.code.nodes:
# FIXME: handle other kinds of assignments?
if isinstance(node, compiler.ast.Assign):
name = node.nodes[0].name
rhs = unparse(node.expr).strip()
doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
yield name, rhs, doc
def _get_module_via_sys_modules(self, fullname):
"""Attempt to fetch source code via sys.modules. This is specifically
to support __main__, but it may catch a few more cases."""
module = sys.modules.get(fullname)
if not isinstance(module, types.ModuleType):
LOG.debug('sys.modules[%r] absent or not a regular module',
fullname)
return
modpath = self._py_filename(getattr(module, '__file__', ''))
if not modpath:
return
is_pkg = hasattr(module, '__path__')
try:
source = inspect.getsource(module)
except IOError:
# Work around inspect.getsourcelines() bug.
if not is_pkg:
raise
source = '\n'
return (module.__file__.rstrip('co'),
source,
hasattr(module, '__path__'))
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', self.remote_name)
encoded = source.encode('zlib').encode('base64').replace('\n', '')
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
# with 2.4 (no bytes literal), an extra .encode() either returns the
# same str (2.x) or an equivalent bytes (3.x).
return [
self.python_path, '-c',
'from codecs import decode as _;'
'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,)
]
def merge_sources(*sources):
def sort_imports(import_string):
order = 0
if import_string.startswith('import .'):
order = 1
elif import_string.startswith('from .'):
order = 3
elif import_string.startswith('from '):
order = 2
return order, import_string
sources = [getsource(source).splitlines() for source in sources]
imports = []
for source in sources:
for line in source:
if line.startswith(('from', 'import')):
imports.append(line)
without_imports = (line for source in chain(sources) for line in source
if not line.startswith(('from', 'import')))
imports.sort(key=sort_imports)
return '\n'.join(chain(imports, without_imports))
def build_lazy_ie(ie, name):
valid_url = getattr(ie, '_VALID_URL', None)
s = ie_template.format(
name=name,
bases=', '.join(map(get_base_name, ie.__bases__)),
valid_url=valid_url,
module=ie.__module__)
if ie.suitable.__func__ is not InfoExtractor.suitable.__func__:
s += '\n' + getsource(ie.suitable)
if hasattr(ie, '_make_valid_url'):
# search extractors
s += make_valid_template.format(valid_url=ie._make_valid_url())
return s
# find the correct sorting and add the required base classes so that sublcasses
# can be correctly created
def get_class_traits(klass):
""" Yield all of the documentation for trait definitions on a class object.
"""
# FIXME: gracefully handle errors here or in the caller?
source = inspect.getsource(klass)
cb = CommentBlocker()
cb.process_file(StringIO(source))
mod_ast = compiler.parse(source)
class_ast = mod_ast.node.nodes[0]
for node in class_ast.code.nodes:
# FIXME: handle other kinds of assignments?
if isinstance(node, compiler.ast.Assign):
name = node.nodes[0].name
rhs = unparse(node.expr).strip()
doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
yield name, rhs, doc
def retrieve_data_string(data, verbose=True):
data_string = inspect.getsource(data)
first_line = data_string.split("\n")[0]
indent_length = len(determine_indent(data_string))
data_string = data_string.replace(first_line, "")
r = re.compile(r'^\s*return.*')
last_line = [s for s in reversed(data_string.split("\n")) if r.match(s)][0]
data_string = data_string.replace(last_line, "")
split_data = data_string.split("\n")
for i, line in enumerate(split_data):
split_data[i] = line[indent_length:] + "\n"
data_string = ''.join(split_data)
if verbose:
print(">>> Data")
print(with_line_numbers(data_string))
return data_string
def source(self):
"""
Returns
-------
str
The source code of this action's callable formatted as Markdown
text.
"""
try:
source = inspect.getsource(self._callable)
except OSError:
raise TypeError(
"Cannot retrieve source code for callable %r" %
self._callable.__name__)
return markdown_source_template % {'source': source}
def parse_func(n, func, prefix, f, s, typ='function'):
'''inspect function signature
# TODO: py3 inspect changed.
'''
prefix += (n,)
s.doc[prefix] = {'t': typ, 'd': getsource(func)}
fn = '.'.join(prefix[1:])
if not matches(fn, f.fmatch):
return
s.funcs.append(fn)
args = getargspec(func)
m = { 'pos_args': [], 'doc': doc_tag_line(f, func)
, 'arg_keys': args.args, 'args': {}}
defs = list(args.defaults or ())
for a in reversed(args.args):
d = m['args'][a] = defs.pop() if defs else 'n.d.'
if d == 'n.d.':
m['pos_args'].insert(0, a)
#m['sig'] = getsig(func, args, m)
s.reg[fn] = m
l('added', 'function' , prefix)
def bootstrap_exec(io, spec):
try:
sendexec(
io,
inspect.getsource(gateway_base),
"execmodel = get_execmodel(%r)" % spec.execmodel,
'io = init_popen_io(execmodel)',
"io.write('1'.encode('ascii'))",
"serve(io, id='%s-slave')" % spec.id,
)
s = io.read(1)
assert s == "1".encode('ascii')
except EOFError:
ret = io.wait()
if ret == 255:
raise HostNotFound(io.remoteaddress)
def bootstrap_socket(io, id):
# XXX: switch to spec
from execnet.gateway_socket import SocketIO
sendexec(
io,
inspect.getsource(gateway_base),
'import socket',
inspect.getsource(SocketIO),
"try: execmodel",
"except NameError:",
" execmodel = get_execmodel('thread')",
"io = SocketIO(clientsock, execmodel)",
"io.write('1'.encode('ascii'))",
"serve(io, id='%s-slave')" % id,
)
s = io.read(1)
assert s == "1".encode('ascii')
def listofmodeluserfunctions(self):
"""User functions of the model class."""
lines = []
for (name, member) in vars(self.model.__class__).items():
if (inspect.isfunction(member) and
(name not in ('run', 'new2old')) and
('fastaccess' in inspect.getsource(member))):
lines.append((name, member))
run = vars(self.model.__class__).get('run')
if run is not None:
lines.append(('run', run))
for (name, member) in vars(self.model).items():
if (inspect.ismethod(member) and
('fastaccess' in inspect.getsource(member))):
lines.append((name, member))
return lines
def get_class_traits(klass):
""" Yield all of the documentation for trait definitions on a class object.
"""
# FIXME: gracefully handle errors here or in the caller?
source = inspect.getsource(klass)
cb = CommentBlocker()
cb.process_file(StringIO(source))
mod_ast = compiler.parse(source)
class_ast = mod_ast.node.nodes[0]
for node in class_ast.code.nodes:
# FIXME: handle other kinds of assignments?
if isinstance(node, compiler.ast.Assign):
name = node.nodes[0].name
rhs = unparse(node.expr).strip()
doc = strip_comment_marker(cb.search_for_comment(node.lineno, default=''))
yield name, rhs, doc
test_power_control.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def wait_for_assert(self, lambda_expression, timeout=TEST_TIMEOUT):
"""
Evaluates lambda_expression once/1s until no AssertionError or hits
timeout.
"""
import time
import inspect
running_time = 0
while running_time < timeout:
try:
lambda_expression()
except AssertionError:
pass
else:
break
time.sleep(1)
running_time += 1
self.assertLess(running_time, timeout, "Timed out waiting for %s." % inspect.getsource(lambda_expression))
def wait_until_true(self, lambda_expression, error_message='', timeout=TEST_TIMEOUT):
"""Evaluates lambda_expression once/1s until True or hits timeout."""
assert hasattr(lambda_expression, '__call__'), 'lambda_expression is not callable: %s' % type(lambda_expression)
assert hasattr(error_message, '__call__') or type(error_message) is str, 'error_message is not callable and not a str: %s' % type(error_message)
assert type(timeout) == int, 'timeout is not an int: %s' % type(timeout)
running_time = 0
lambda_result = None
wait_time = 0.01
while not lambda_result and running_time < timeout:
lambda_result = lambda_expression()
logger.debug("%s evaluated to %s" % (inspect.getsource(lambda_expression), lambda_result))
if not lambda_result:
time.sleep(wait_time)
wait_time = min(1, wait_time * 10)
running_time += wait_time
if hasattr(error_message, '__call__'):
error_message = error_message()
self.assertLess(running_time,
timeout,
'Timed out waiting for %s\nError Message %s' % (inspect.getsource(lambda_expression), error_message))
def wait_for_assert(self, lambda_expression, timeout=TEST_TIMEOUT):
"""
Evaluates lambda_expression once/1s until no AssertionError or hits
timeout.
"""
running_time = 0
assertion = None
while running_time < timeout:
try:
lambda_expression()
except AssertionError, e:
assertion = e
logger.debug("%s tripped assertion: %s" % (inspect.getsource(lambda_expression), e))
else:
break
time.sleep(1)
running_time += 1
self.assertLess(running_time,
timeout,
"Timed out waiting for %s\nAssertion %s" % (inspect.getsource(lambda_expression), assertion))
def test_proceed_with_fake_filename(self):
'''doctest monkeypatches linecache to enable inspection'''
fn, source = '<test>', 'def x(): pass\n'
getlines = linecache.getlines
def monkey(filename, module_globals=None):
if filename == fn:
return source.splitlines(True)
else:
return getlines(filename, module_globals)
linecache.getlines = monkey
try:
ns = {}
exec compile(source, fn, 'single') in ns
inspect.getsource(ns["x"])
finally:
linecache.getlines = getlines
def test_proceed_with_fake_filename(self):
'''doctest monkeypatches linecache to enable inspection'''
fn, source = '<test>', 'def x(): pass\n'
getlines = linecache.getlines
def monkey(filename, module_globals=None):
if filename == fn:
return source.splitlines(True)
else:
return getlines(filename, module_globals)
linecache.getlines = monkey
try:
ns = {}
exec compile(source, fn, 'single') in ns
inspect.getsource(ns["x"])
finally:
linecache.getlines = getlines
def interpret_fraction(f):
assert inspect.isfunction(f)
members = dict(inspect.getmembers(f))
global_dict = members["__globals__"]
source_filename = inspect.getsourcefile(f)
f_ast = ast.parse(inspect.getsource(f))
_, starting_line = inspect.getsourcelines(f)
# ast_demo.increment_lineno(f_ast, starting_line - 1)
# print("AST:", ast_demo.dump(f_ast))
visitor = FractionInterpreter()
new_ast = visitor.visit(f_ast)
# print(ast_demo.dump(new_ast))
ast.fix_missing_locations(new_ast)
co = compile(new_ast, '<ast_demo>', 'exec')
fake_locals = {}
# exec will define the new function into fake_locals scope
# this is to avoid conflict with vars in the real locals()
# https://stackoverflow.com/questions/24733831/using-a-function-defined-in-an-execed-string-in-python-3
exec(co, None, fake_locals)
# new_f = locals()[visitor._new_func_name(f.__name__)]
return fake_locals[f.__name__]