def get_action(driver, keyword):
"""get action class corresponding to the keyword in the driver
"""
drvmod = 'ProductDrivers.' + driver
drvmodobj = importlib.import_module(drvmod)
drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
main_src = inspect.getsource(main_method)
pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
pkglst = pkglstmatch.group(1).split(',')
for pkg in pkglst:
pkgobj = importlib.import_module(pkg)
pkgdir = os.path.dirname(pkgobj.__file__)
action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
for action_module_obj in action_module_objs:
for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
if keyword == func_name[0]:
return action_class[1]
return None
python类getmembers()的实例源码
def __init__(self, type_list):
mod_name = sys.modules[__name__]
cls_list = [
cls[0]
for cls in inspect.getmembers(mod_name)
if cls[0].isupper() and inspect.isclass(cls[1]) and cls[1].__module__ == __name__
]
try:
type_list.remove('TERMINATE') # can't hold order, so remove it and add it back when done
except ValueError as exc:
pass
type_list = list(set(type_list).intersection(set(cls_list))) # remove bad and duplicate values
array_args = [
{'version': getattr(getattr(mod_name, rtype), 'version'),
'code': getattr(getattr(mod_name, rtype), 'code')}
for rtype in type_list
]
array_args.append({'code': 0, 'version': 0}) # add TERMINATE as last req
self.streaming_event_request = StreamingEventRequest(service_array=array_args, timestamp=Struct.get_ts(), flags=Struct.get_flags())
self.message_header = MessageHeader(type=2049, data=self.streaming_event_request)
self.record = self.message_header.pack()
def struct_fields(value):
if isinstance(value, types.ClassType) or hasattr(value, '__bases__'):
clazz = value
else:
clazz = type(value)
# Try to get the correct field ordering, if it is available, otherwise
# just look at the class dictionary to find the fields.
fields = getattr(clazz, '__fields', None)
if fields is None:
fields = [p for p in clazz.__dict__.itervalues() if isinstance(p, simple_property)]
fields += [p for p in clazz.__dict__.itervalues() if isinstance(p, simpleseq_property)]
members = inspect.getmembers(value)
for member in members:
if isinstance(member[1], simple_property) or isinstance(member[1], simpleseq_property):
foundMatch = False
for field in fields:
if member[1].id_ == field.id_:
foundMatch = True
break
if not foundMatch:
fields += [member[1]]
return fields
def uses_tags(self):
members = inspect.getmembers(self, predicate=inspect.ismethod)
handles = [member for member in members if member[0].startswith("handle_") or member[0] == 'preprocess']
alltags = set([])
for handle in handles:
if handle[0].startswith("handle_"):
alltags.add(handle[0][len("handle_"):])
doc = handle[1].__doc__
if not doc:
continue
for line in doc.splitlines():
m = re.search(r"Uses tags:(.+)", line)
if m:
tags = m.group(1).split(",")
tags = [tag.strip() for tag in tags]
alltags.update(tags)
alltags.update(self.enumerateable_envs.keys())
return alltags
def uses_tags(self) -> set:
members = inspect.getmembers(self, predicate=inspect.ismethod)
handles = [member for member in members
if (member[0].startswith("handle_") or
member[0] == 'make_numbers')]
alltags = set([])
for handle in handles:
if handle[0].startswith("handle_"):
alltags.add(handle[0][len("handle_"):])
doc = handle[1].__doc__
if not doc:
continue
for line in doc.splitlines():
m = re.search(r"Uses tags:(.+)", line)
if m:
tags = m.group(1).split(",")
tags = [tag.strip() for tag in tags]
alltags.update(tags)
alltags.update(self.enumerateable_envs.keys())
alltags.update(self.metatags)
return alltags
def add_click_commands(module, cli, command_dict, namespaced):
"""Loads all click commands"""
module_commands = [
item for item in getmembers(module)
if isinstance(item[1], BaseCommand)
]
options = command_dict.get('config', {})
namespace = command_dict.get('namespace')
for name, function in module_commands:
f_options = options.get(name, {})
command_name = f_options.get('name', getattr(function, 'name', name))
if namespace:
command_name = '{}_{}'.format(namespace, command_name)
elif namespaced:
module_namespace = module.__name__.split('.')[-1]
command_name = '{}_{}'.format(module_namespace, command_name)
function.short_help = f_options.get('help_text', function.short_help)
cli.add_command(function, name=command_name)
def test_doc_strings(self):
bad_docstrings = []
for cls in [FuncSeleniumMixin, FuncWebTestMixin]:
for name, member in inspect.getmembers(cls):
if name.startswith('__'):
continue
if hasattr(member, 'im_func'):
member = member.im_func # Python 2 method
member_doc = getattr(member, '__doc__', None)
base_member = getattr(FuncBaseMixin, name, None)
if base_member is not None:
if hasattr(base_member, 'im_func'):
base_member = base_member.im_func # Python 2 method
base_doc = getattr(base_member, '__doc__', None)
if base_doc is not None and member_doc != base_doc:
bad_docstrings.append((cls, name))
if bad_docstrings:
self.fail("The following methods have incorrect or missing docstrings "
"compared to FuncBaseMixin: \n" +
"\n".join("{0}.{1}".format(cls.__name__, name)
for cls, name in bad_docstrings))
def __init__(self, bot):
self.bot = bot
# Add commands as random subcommands
for name, command in inspect.getmembers(self):
if isinstance(command, commands.Command) and command.parent is None and name != "random":
self.bot.add_command(command)
self.random.add_command(command)
# Add fact subcommands as subcommands of corresponding commands
for command, parent in ((self.fact_cat, self.cat), (self.fact_date, self.date), (self.fact_number, self.number)):
utilities.add_as_subcommand(self, command, parent, "fact")
# Add random subcommands as subcommands of corresponding commands
self.random_subcommands = ((self.color, "Resources.color"), (self.giphy, "Resources.giphy"), (self.map, "Resources.map"), (self.streetview, "Resources.streetview"), (self.uesp, "Search.uesp"), (self.wikipedia, "Search.wikipedia"), (self.xkcd, "Resources.xkcd"))
for command, parent_name in self.random_subcommands:
utilities.add_as_subcommand(self, command, parent_name, "random")
# Import jokes
self.jokes = []
try:
with open("data/jokes.csv", newline = "") as jokes_file:
jokes_reader = csv.reader(jokes_file)
for row in jokes_reader:
self.jokes.append(row[0])
except FileNotFoundError:
pass
def init_db():
"""
Import all modules here that might define models so that
they will be registered properly on the metadata, and then
create a database
"""
def func(subcls):
""" To check the subclasses of BASE"""
try:
if issubclass(subcls[1], BASE):
return True
except TypeError:
pass
return False
# pylint: disable=bad-builtin
subclses = filter(func, inspect.getmembers(models, inspect.isclass))
LOGGER.debug('Import models: %s', [subcls[1] for subcls in subclses])
BASE.metadata.create_all(bind=ENGINE)
def non_removable_attrs(cls):
"""Returns a set of names of attributes that may not be removed.
Attributes whose 'mandatory' property is True are automatically added
to this set. To add additional attributes to the set, override the
field _extra_non_removable_attrs in subclasses, with a set of the form
{'/foo', '/bar'}.
"""
if cls._non_removable_attrs is None:
cls._non_removable_attrs = cls._extra_non_removable_attrs.copy()
if cls._api_base:
fields = inspect.getmembers(cls._api_base,
lambda a: not inspect.isroutine(a))
for name, field in fields:
if getattr(field, 'mandatory', False):
cls._non_removable_attrs.add('/%s' % name)
return cls._non_removable_attrs
def params(cls):
params = list()
for name, value in inspect.getmembers(cls):
if not isinstance(value, Param):
continue
params.append((name, value))
keys = dict()
orders = dict()
for base in inspect.getmro(cls):
for name, value in inspect.getmembers(base):
if not isinstance(value, Param):
continue
bites = list(name.split("_"))
keys[name] = list()
for i in range(len(bites)):
key = tuple(bites[:i + 1])
keys[name].append(key)
orders[key] = min(orders.get(key, value.order), value.order)
return sorted(params, key=lambda x: tuple(map(orders.get, keys[x[0]])))
def _signal_numbers_to_names():
signums = {}
for name, value in inspect.getmembers(signal):
if not name.startswith("SIG") or name.startswith("SIG_"):
continue
if not isinstance(value, numbers.Integral):
continue
signums.setdefault(value, []).append(name)
for signum, names in signums.items():
signums[signum] = tuple(sorted(names))
return signums
def test_resources(self):
"""Create all the resources to test model definition"""
try:
model_list = []
for _, model_class in inspect.getmembers(models):
if inspect.isclass(model_class) and (
issubclass(model_class, core.ModelBase)):
model_list.append(model_class)
for model_class in _sort_model_by_foreign_key(model_list):
create_dict = _construct_resource_dict(model_class)
with self.context.session.begin():
core.create_resource(
self.context, model_class, create_dict)
except Exception as e:
msg = str(e)
self.fail('test_resources raised Exception unexpectedly %s' % msg)
def add_network_methods(target):
'''
Attach extension methods to an object that represents a Veneer network.
Note: The 'network_' prefix will be removed from method names.
target: Veneer network object to attach extension methods to.
'''
import veneer.extensions as extensions # Import self to inspect available functions
# Generate dict of {function name: function}, skipping this function
this_func_name = sys._getframe().f_code.co_name
funcs = inspect.getmembers(extensions, inspect.isfunction)
funcs = dict((func_name, func) for func_name, func in funcs
if func_name != this_func_name
)
# Assign functions to target, removing the 'network_' prefix
for f_name, f in funcs.items():
if f_name.startswith('network_'):
f_name = f_name.replace('network_', '')
setattr(target, f_name, MethodType(f, target))
def _get_queryset_methods(cls, queryset_class):
def create_method(name, method):
def manager_method(self, *args, **kwargs):
return getattr(self.get_queryset(), name)(*args, **kwargs)
manager_method.__name__ = method.__name__
manager_method.__doc__ = method.__doc__
return manager_method
new_methods = {}
# Refs http://bugs.python.org/issue1785.
predicate = inspect.isfunction if six.PY3 else inspect.ismethod
for name, method in inspect.getmembers(queryset_class, predicate=predicate):
# Only copy missing methods.
if hasattr(cls, name):
continue
# Only copy public methods or methods with the attribute `queryset_only=False`.
queryset_only = getattr(method, 'queryset_only', None)
if queryset_only or (queryset_only is None and name.startswith('_')):
continue
# Copy the method onto the manager.
new_methods[name] = create_method(name, method)
return new_methods
def iterAttributes(self):
"""
List all the attributes defined in this module.
Note: Future work is planned here to make it possible to list python
attributes on a module without loading the module by inspecting ASTs or
bytecode, but currently any iteration of PythonModule objects insists
they must be loaded, and will use inspect.getmodule.
@raise NotImplementedError: if this module is not loaded.
@return: a generator yielding PythonAttribute instances describing the
attributes of this module.
"""
if not self.isLoaded():
raise NotImplementedError(
"You can't load attributes from non-loaded modules yet.")
for name, val in inspect.getmembers(self.load()):
yield PythonAttribute(self.name+'.'+name, self, True, val)
def _get_all(self):
#If this is a MOR we need to recurse
if self._type == 'ManagedObjectReference':
oc = self._server._get_object_properties(self._obj, get_all=True)
ps = oc.get_element_propSet()
self._values = dict([(i.Name, i.Val) for i in ps])
#Just inspect the attributes
else:
methods = getmembers(self._obj, predicate=inspect.ismethod)
self._values = {}
for name, method in methods:
try:
if name.startswith("get_element_"):
self._values[name[12:]] = method()
except AttributeError:
continue
self._values_set = True
def load_file(self, filename):
# Saves the name and the extension of the file. It assumes that the class
# to be loaded has the same name of the module (.py or .pyc file).
mod_name, file_ext = os.path.splitext(os.path.split(filename)[-1])
py_mod = None
# If the file has .py extension, then it loads that module.
if file_ext.lower() == '.py':
py_mod = imp.load_source(mod_name, filename)
# If the file has .pyc extension, then it loads that module.
elif file_ext.lower() == '.pyc':
py_mod = imp.load_compiled(mod_name, filename)
else:
raise EvaluateException('Unable to find the file ' + filename + '!')
# Builds the list of the classes contained in the module.
classes = [u for (v, u) in getmembers(py_mod, isclass) if v.startswith(mod_name)]
# Loads the classes contained in the module.
for c in classes:
self.load_class(c)
def _list_xml_members(cls):
"""Generator listing all members which are XML elements or attributes.
The following members would be considered XML members:
foo = 'abc' - indicates an XML attribute with the qname abc
foo = SomeElement - indicates an XML child element
foo = [AnElement] - indicates a repeating XML child element, each instance
will be stored in a list in this member
foo = ('att1', '{http://example.com/namespace}att2') - indicates an XML
attribute which has different parsing rules in different versions of
the protocol. Version 1 of the XML parsing rules will look for an
attribute with the qname 'att1' but verion 2 of the parsing rules will
look for a namespaced attribute with the local name of 'att2' and an
XML namespace of 'http://example.com/namespace'.
"""
members = []
for pair in inspect.getmembers(cls):
if not pair[0].startswith('_') and pair[0] != 'text':
member_type = pair[1]
if (isinstance(member_type, tuple) or isinstance(member_type, list)
or isinstance(member_type, (str, unicode))
or (inspect.isclass(member_type)
and issubclass(member_type, XmlElement))):
members.append(pair)
return members
def get_defining_item(self, code):
for modname, mod in sys.modules.iteritems():
file = getattr(mod, '__file__', '').replace('.pyc', '.py')
if file == code.co_filename:
for classname,clazz in inspect.getmembers(mod, predicate=inspect.isclass):
for name,member in inspect.getmembers(clazz, predicate=inspect.ismethod):
filename = member.im_func.func_code.co_filename
lineno = member.im_func.func_code.co_firstlineno
if filename == code.co_filename and lineno == code.co_firstlineno:
self.imports_.add((modname, clazz.__name__))
return clazz, member
for name,member in inspect.getmembers(clazz, predicate=inspect.isfunction):
filename = member.func_code.co_filename
lineno = member.func_code.co_firstlineno
if filename == code.co_filename and lineno == code.co_firstlineno:
self.imports_.add((modname, clazz.__name__))
return clazz, member
self.imports_.add((modname,))
return mod, mod
def __init__(self, docstring, class_name, class_object):
super(NumpyClassDocString, self).__init__(docstring)
self.class_name = class_name
methods = dict((name, func) for name, func
in inspect.getmembers(class_object))
self.has_parameters = False
if '__init__' in methods:
# verify if __init__ is a Python function. If it isn't
# (e.g. the function is implemented in C), getargspec will fail
if not inspect.ismethod(methods['__init__']):
return
args, varargs, keywords, defaults = inspect.getargspec(
methods['__init__'])
if (args and args != ['self']) or varargs or keywords or defaults:
self.has_parameters = True
def handle_class(val, class_name):
cls_errors = []
docstring = inspect.getdoc(val)
if docstring is None:
cls_errors.append((class_name,
'**missing** class-level docstring'))
else:
cls_errors = [
(e,) for e in
NumpyClassDocString(docstring, class_name, val).get_errors()
]
# Get public methods and parse their docstrings
methods = dict(((name, func) for name, func in inspect.getmembers(val)
if not name.startswith('_') and callable(func) and
type(func) is not type))
for m_name, method in six.iteritems(methods):
# skip error check if the method was inherited
# from a parent class (which means it wasn't
# defined in this source file)
if inspect.getmodule(method) is not None:
continue
cls_errors.extend(handle_method(method, m_name, class_name))
return cls_errors
def _getmembers(item):
import inspect
try:
members = inspect.getmembers(item)
except Exception:
members = [(x, getattr(item, x)) for x in dir(item)
if hasattr(item, x)]
return members
#-----------------------------------------------------------------------------
# The following SafeEval class and company are adapted from Michael Spencer's
# ASPN Python Cookbook recipe:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/364469
# Accordingly it is mostly Copyright 2006 by Michael Spencer.
# The recipe, like most of the other ASPN Python Cookbook recipes was made
# available under the Python license.
# http://www.python.org/license
# It has been modified to:
# * handle unary -/+
# * support True/False/None
# * raise SyntaxError instead of a custom exception.
def modulemap():
"""Returns a dictionary where `type_name`s are mapped
to corresponding classes from `gamelocker.datatypes`.
:return: A dictionary with keys of `type_name`
and values of :class:`DataMessage` subclasses.
:rtype: :class:`gamelocker.janus.DataMessage`
"""
typemap = dict()
classes = inspect.getmembers(sys.modules[__name__], inspect.isclass)
for name, value in classes:
if name == "Attribute" or name == "DataMessage":
continue
typemap[value.type_name] = value
return typemap
def load():
"""Load the active experiment."""
if os.getcwd() not in sys.path:
sys.path.append(os.getcwd())
try:
exp = imp.load_source('dallinger_experiment', "dallinger_experiment.py")
classes = inspect.getmembers(exp, inspect.isclass)
exps = [c for c in classes
if (c[1].__bases__[0].__name__ in "Experiment")]
this_experiment = exps[0][0]
mod = __import__('dallinger_experiment', fromlist=[this_experiment])
return getattr(mod, this_experiment)
except ImportError:
logger.error('Could not import experiment.')
raise
def export_rule_data(variables, actions):
"""
export_rule_data is used to export all information about the
variables, actions, and operators to the client. This will return a
dictionary with three keys:
- variables: a list of all available variables along with their label, type and options
- actions: a list of all actions along with their label and params
- variable_type_operators: a dictionary of all field_types -> list of available operators
"""
from . import operators
actions_data = actions.get_all_actions()
variables_data = variables.get_all_variables()
variable_type_operators = {}
for variable_class in inspect.getmembers(operators, lambda x: getattr(x, 'export_in_rule_data', False)):
variable_type = variable_class[1] # getmembers returns (name, value)
variable_type_operators[variable_type.name] = variable_type.get_all_operators()
return {"variables": variables_data,
"actions": actions_data,
"variable_type_operators": variable_type_operators}
def get_instance_public_methods(instance):
"""Retrieves an objects public methods
:param instance: The instance of the class to inspect
:rtype: dict
:returns: A dictionary that represents an instance's methods where
the keys are the name of the methods and the
values are the handler to the method.
"""
instance_members = inspect.getmembers(instance)
instance_methods = {}
for name, member in instance_members:
if not name.startswith('_'):
if inspect.ismethod(member):
instance_methods[name] = member
return instance_methods
def source(self, cls):
"""
A decorator to mark package source modules
It would register all the methods of the class marked by @handler
with the dispatcher.
"""
try:
from pomu.source.base import BaseSource
except ImportError: #circular import
return cls
if cls == BaseSource:
return cls
self.backends[cls.__name__] = cls
for m, obj in inspect.getmembers(cls):
if isinstance(obj, self.handler._handler):
self.register_package_handler(cls, obj.handler, obj.priority)
return cls
def get_all_uris():
"""
Templating utility to retrieve all available URIs, mapping modules to URI classes.
Used ultimately to store all URI paths in the template for easy retrieval by Javascript.
{{ all_uris() }}
"""
return dict(all_uris=lambda: {
uri_module: filter(
lambda module_name: module_name.endswith('URI') and len(module_name) > 3,
map(lambda module_pair: module_pair[0], inspect.getmembers(sys.modules['uri.' + uri_module])),
)
for uri_module in filter(
lambda mod: not mod.startswith('__'),
dir(uri),
)
})
def __new__(cls, name, bases, clsdict):
"""
Equip all base-class methods with a needs_values decorator, and all non-const methods
with a set_dirty_and_flush_changes decorator in addition to that."""
kmm = '_mutating_methods_'
if kmm in clsdict:
mutating_methods = clsdict[kmm]
for base in bases:
methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_"))
for name, method in methods:
if name in clsdict:
continue
method_with_values = needs_values(method)
if name in mutating_methods:
method_with_values = set_dirty_and_flush_changes(method_with_values)
# END mutating methods handling
clsdict[name] = method_with_values
# END for each name/method pair
# END for each base
# END if mutating methods configuration is set
new_type = super(MetaParserBuilder, cls).__new__(cls, name, bases, clsdict)
return new_type