def is_class_sealed(klass):
"""
Returns a boolean indicating whether or not the supplied class can accept
dynamic properties.
@rtype: C{bool}
@since: 0.5
"""
mro = inspect.getmro(klass)
new = False
if mro[-1] is object:
mro = mro[:-1]
new = True
for kls in mro:
if new and '__dict__' in kls.__dict__:
return False
if not hasattr(kls, '__slots__'):
return False
return True
python类getmro()的实例源码
def params(cls):
params = list()
for name, value in inspect.getmembers(cls):
if not isinstance(value, Param):
continue
params.append((name, value))
keys = dict()
orders = dict()
for base in inspect.getmro(cls):
for name, value in inspect.getmembers(base):
if not isinstance(value, Param):
continue
bites = list(name.split("_"))
keys[name] = list()
for i in range(len(bites)):
key = tuple(bites[:i + 1])
keys[name].append(key)
orders[key] = min(orders.get(key, value.order), value.order)
return sorted(params, key=lambda x: tuple(map(orders.get, keys[x[0]])))
def consume_power_up(self, power_up):
if isinstance(power_up, CryptoPowerUp):
power_up_class = power_up.__class__
power_up_instance = power_up
elif CryptoPowerUp in inspect.getmro(power_up):
power_up_class = power_up
power_up_instance = power_up()
else:
raise TypeError(
("power_up must be a subclass of CryptoPowerUp or an instance "
"of a subclass of CryptoPowerUp."))
self._power_ups[power_up_class] = power_up_instance
if power_up.confers_public_key:
# TODO: Make this an ID for later lookup on a KeyStore.
self.public_keys[
power_up_class] = power_up_instance.public_key()
def collect(self):
if not getattr(self.obj, "__test__", True):
return []
# NB. we avoid random getattrs and peek in the __dict__ instead
# (XXX originally introduced from a PyPy need, still true?)
dicts = [getattr(self.obj, '__dict__', {})]
for basecls in inspect.getmro(self.obj.__class__):
dicts.append(basecls.__dict__)
seen = {}
l = []
for dic in dicts:
for name, obj in list(dic.items()):
if name in seen:
continue
seen[name] = True
res = self.makeitem(name, obj)
if res is None:
continue
if not isinstance(res, list):
res = [res]
l.extend(res)
l.sort(key=lambda item: item.reportinfo()[:2])
return l
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def trace_toolchain(toolchain):
"""
Trace the versions of the involved packages for the provided
toolchain instance.
"""
pkgs = []
for cls in getmro(type(toolchain)):
if not issubclass(cls, Toolchain):
continue
dist = _cls_lookup_dist(cls)
value = {
'project_name': dist.project_name,
'version': dist.version,
} if dist else {}
key = '%s:%s' % (cls.__module__, cls.__name__)
pkgs.append({key: value})
return pkgs
def do_custom(self):
in_obj = cli.custom_actions[self.cls_name][self.action][2]
# Get the object (lazy), then act
if in_obj:
data = {}
if hasattr(self.mgr, '_from_parent_attrs'):
for k in self.mgr._from_parent_attrs:
data[k] = self.args[k]
if gitlab.mixins.GetWithoutIdMixin not in inspect.getmro(self.cls):
data[self.cls._id_attr] = self.args.pop(self.cls._id_attr)
o = self.cls(self.mgr, data)
method_name = self.action.replace('-', '_')
return getattr(o, method_name)(**self.args)
else:
return getattr(self.mgr, self.action)(**self.args)
def extend_parser(parser):
subparsers = parser.add_subparsers(title='object', dest='what',
help="Object to manipulate.")
subparsers.required = True
# populate argparse for all Gitlab Object
classes = []
for cls in gitlab.v3.objects.__dict__.values():
try:
if gitlab.base.GitlabObject in inspect.getmro(cls):
classes.append(cls)
except AttributeError:
pass
classes.sort(key=operator.attrgetter("__name__"))
for cls in classes:
arg_name = cli.cls_to_what(cls)
object_group = subparsers.add_parser(arg_name)
object_subparsers = object_group.add_subparsers(
dest='action', help="Action to execute.")
_populate_sub_parser_by_class(cls, object_subparsers)
object_subparsers.required = True
return parser
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def rewrite_Union(self, union):
if len(union.__args__) <= self.max_union_len:
return union
rw_union = self._rewrite_to_tuple(union)
if rw_union is not None:
return rw_union
try:
for ancestor in inspect.getmro(union.__args__[0]):
if (
ancestor is not object and
all(issubclass(t, ancestor) for t in union.__args__)
):
return ancestor
except TypeError:
pass
return Any
def get_class_that_defined_method(meth):
"""Returns the class that created the given method.
A helper function for finding class methods amongst a list of many
methods.
Source:
http://stackoverflow.com/questions/3589311/get-defining-class-of-unbound-method-object-in-python-3/25959545#25959545
"""
if inspect.ismethod(meth):
for cls in inspect.getmro(meth.__self__.__class__):
if cls.__dict__.get(meth.__name__) is meth:
return cls
meth = meth.__func__ # fallback to __qualname__ parsing
if inspect.isfunction(meth):
# Check to make sure the method has a "qualname"
if not getattr(meth, '__qualname__', None):
return None
cls = getattr(inspect.getmodule(meth),
meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
if isinstance(cls, type):
return cls
return None # not required since None would have been implicitly returned anyway
def _recursive_call(expr_to_call, on_args):
def the_call_method_is_overridden(expr):
for cls in getmro(type(expr)):
if '__call__' in cls.__dict__:
return cls != Basic
if callable(expr_to_call) and the_call_method_is_overridden(expr_to_call):
if isinstance(expr_to_call, C.Symbol): # XXX When you call a Symbol it is
return expr_to_call # transformed into an UndefFunction
else:
return expr_to_call(*on_args)
elif expr_to_call.args:
args = [Basic._recursive_call(
sub, on_args) for sub in expr_to_call.args]
return type(expr_to_call)(*args)
else:
return expr_to_call
def pysourcefiles(self):
"""All source files of the actual models Python classes and their
respective base classes."""
sourcefiles = set()
for (name, child) in vars(self).items():
try:
parents = inspect.getmro(child)
except AttributeError:
continue
for parent in parents:
try:
sourcefile = inspect.getfile(parent)
except TypeError:
break
sourcefiles.add(sourcefile)
return Lines(*sourcefiles)
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def collect(self):
if not getattr(self.obj, "__test__", True):
return []
# NB. we avoid random getattrs and peek in the __dict__ instead
# (XXX originally introduced from a PyPy need, still true?)
dicts = [getattr(self.obj, '__dict__', {})]
for basecls in inspect.getmro(self.obj.__class__):
dicts.append(basecls.__dict__)
seen = {}
l = []
for dic in dicts:
for name, obj in list(dic.items()):
if name in seen:
continue
seen[name] = True
res = self.makeitem(name, obj)
if res is None:
continue
if not isinstance(res, list):
res = [res]
l.extend(res)
l.sort(key=lambda item: item.reportinfo()[:2])
return l
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def get_class_that_defined_method(meth):
if inspect.ismethod(meth):
for cls in inspect.getmro(meth.__self__.__class__):
if cls.__dict__.get(meth.__name__) is meth:
return cls
meth = meth.__func__ # fallback to __qualname__ parsing
if inspect.isfunction(meth):
if not hasattr(meth, '__qualname__'):
pass # python too old
else:
try:
cls = getattr(
inspect.getmodule(meth),
meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]
) # yapf: disable
except AttributeError: # defined in an exec() on new python?
cls = 'exec'
if isinstance(cls, type):
return cls
return None
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def collect(self):
if not getattr(self.obj, "__test__", True):
return []
# NB. we avoid random getattrs and peek in the __dict__ instead
# (XXX originally introduced from a PyPy need, still true?)
dicts = [getattr(self.obj, '__dict__', {})]
for basecls in inspect.getmro(self.obj.__class__):
dicts.append(basecls.__dict__)
seen = {}
l = []
for dic in dicts:
for name, obj in list(dic.items()):
if name in seen:
continue
seen[name] = True
res = self.makeitem(name, obj)
if res is None:
continue
if not isinstance(res, list):
res = [res]
l.extend(res)
l.sort(key=lambda item: item.reportinfo()[:2])
return l
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def _get_field_class_for_data_type(self, data_type):
field_cls = None
types = inspect.getmro(type(data_type))
# First search for a field class from self.SQLA_TYPE_MAPPING
for col_type in types:
if col_type in self.SQLA_TYPE_MAPPING:
field_cls = self.SQLA_TYPE_MAPPING[col_type]
if callable(field_cls) and not _is_field(field_cls):
field_cls = field_cls(self, data_type)
break
else:
# Try to find a field class based on the column's python_type
try:
python_type = data_type.python_type
except NotImplementedError:
python_type = None
if python_type in self.type_mapping:
field_cls = self.type_mapping[python_type]
else:
raise ModelConversionError(
'Could not find field column of type {0}.'.format(types[0]))
return field_cls
def __setattr__(self, name, value):
"""Sets the component data related to the Entity."""
if name in ("_id", "_world"):
object.__setattr__(self, name, value)
else:
# If the value is a compound component (e.g. a Button
# inheriting from a Sprite), it needs to be added to all
# supported component type instances.
mro = inspect.getmro(value.__class__)
if type in mro:
stop = mro.index(type)
else:
stop = mro.index(object)
mro = mro[0:stop]
wctypes = self._world.componenttypes
for clstype in mro:
if clstype not in wctypes:
self._world.add_componenttype(clstype)
self._world.components[clstype][self] = value
def is_static_method(klass, attr, value=None):
"""Test if a value of a class is static method.
example::
class MyClass(object):
@staticmethod
def method():
...
:param klass: the class
:param attr: attribute name
:param value: attribute value
"""
if value is None:
value = getattr(klass, attr)
assert getattr(klass, attr) == value
for cls in inspect.getmro(klass):
if inspect.isroutine(value):
if attr in cls.__dict__:
binded_value = cls.__dict__[attr]
if isinstance(binded_value, staticmethod):
return True
return False
def is_class_method(klass, attr, value=None):
"""Test if a value of a class is class method.
example::
class MyClass(object):
@classmethod
def method(cls):
...
:param klass: the class
:param attr: attribute name
:param value: attribute value
"""
if value is None:
value = getattr(klass, attr)
assert getattr(klass, attr) == value
for cls in inspect.getmro(klass):
if inspect.isroutine(value):
if attr in cls.__dict__:
binded_value = cls.__dict__[attr]
if isinstance(binded_value, classmethod):
return True
return False
def _filter_field(self, queryset, field_name, qualifier, value, invert, partial=''):
try:
field = self.model._meta.get_field(field_name)
except models.fields.FieldDoesNotExist:
raise BinderRequestError('Unknown field in filter: {{{}}}.{{{}}}.'.format(self.model.__name__, field_name))
for field_class in inspect.getmro(field.__class__):
filter_class = self.get_field_filter(field_class)
if filter_class:
filter = filter_class(field)
try:
return queryset.filter(filter.get_q(qualifier, value, invert, partial))
except ValidationError as e:
# TODO: Maybe convert to a BinderValidationError later?
raise BinderRequestError(e.message)
# If we get here, we didn't find a suitable filter class
raise BinderRequestError('Filtering not supported for type {} ({{{}}}.{{{}}}).'
.format(field.__class__.__name__, self.model.__name__, field_name))
def _filter_field(self, queryset, field_name, qualifier, value, invert, partial=''):
try:
field = self.model._meta.get_field(field_name)
except models.fields.FieldDoesNotExist:
raise BinderRequestError('Unknown field in filter: {{{}}}.{{{}}}.'.format(self.model.__name__, field_name))
for field_class in inspect.getmro(field.__class__):
filter_class = self.get_field_filter(field_class)
if filter_class:
filter = filter_class(field)
try:
return queryset.filter(filter.get_q(qualifier, value, invert, partial))
except ValidationError as e:
# TODO: Maybe convert to a BinderValidationError later?
raise BinderRequestError(e.message)
# If we get here, we didn't find a suitable filter class
raise BinderRequestError('Filtering not supported for type {} ({{{}}}.{{{}}}).'
.format(field.__class__.__name__, self.model.__name__, field_name))
def attach_models_to_base_from_module(mod:ModuleType, Base:type):
"""Attach all models in a Python module to SQLAlchemy base class.
The attachable models must declare ``__tablename__`` property and must not have existing ``Base`` class in their inheritance.
"""
for key in dir(mod):
value = getattr(mod, key)
if inspect.isclass(value):
# TODO: We can't really check for SQLAlchemy declarative base class as it's usually run-time generated and may be out of our control
if any(base.__name__ == "Base" for base in inspect.getmro(value)):
# Already inhertis from SQALchemy declarative Base
continue
if hasattr(value, "__tablename__"):
# This declares table but is not attached to any base yet
attach_model_to_base(value, Base)
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base
def get_unpatched_class(cls):
"""Protect against re-patching the distutils if reloaded
Also ensures that no other distutils extension monkeypatched the distutils
first.
"""
external_bases = (
cls
for cls in inspect.getmro(cls)
if not cls.__module__.startswith('setuptools')
)
base = next(external_bases)
if not base.__module__.startswith('distutils'):
msg = "distutils has already been patched by %r" % cls
raise AssertionError(msg)
return base