def test_abstractmethod_integration(self):
for abstractthing in [abc.abstractmethod, abc.abstractproperty]:
class C:
__metaclass__ = abc.ABCMeta
@abstractthing
def foo(self): pass # abstract
def bar(self): pass # concrete
self.assertEqual(C.__abstractmethods__, set(["foo"]))
self.assertRaises(TypeError, C) # because foo is abstract
self.assertTrue(isabstract(C))
class D(C):
def bar(self): pass # concrete override of concrete
self.assertEqual(D.__abstractmethods__, set(["foo"]))
self.assertRaises(TypeError, D) # because foo is still abstract
self.assertTrue(isabstract(D))
class E(D):
def foo(self): pass
self.assertEqual(E.__abstractmethods__, set())
E() # now foo is concrete, too
self.assertFalse(isabstract(E))
class F(E):
@abstractthing
def bar(self): pass # abstract override of concrete
self.assertEqual(F.__abstractmethods__, set(["bar"]))
self.assertRaises(TypeError, F) # because bar is abstract now
self.assertTrue(isabstract(F))
python类isabstract()的实例源码
def is_abstract(cls):
ABCFLAG = '__abstractmethods__'
isabc = hasattr(cls, ABCFLAG) or inspect.isabstract(cls)
return isabc
def get_subclasses(class_type, directory=None):
"""
Creates a dictionary of classes which inherit from ``class_type`` found in
all python files within a given directory, keyed by the class name in snake
case as a string.
:param class_type: The base class that all returned classes should inherit
from.
:type class_type: cls
:param directory: The directory to look for classes in.
:type directory: str
:returns: A dict of classes found.
:rtype: dict
"""
try:
glob_expression = os.path.join(directory, "*.py")
except (AttributeError, TypeError):
raise TypeError("'directory' object should be a string")
module_paths = glob.glob(glob_expression)
sys.path.append(directory)
modules = [
imp.load_source(
os.path.basename(module_path).split(".")[0], module_path
)
for module_path in module_paths
if "__init__" not in module_path
]
classes = {}
for module in modules:
for attr in module.__dict__.values():
if inspect.isclass(attr) \
and issubclass(attr, class_type) \
and not inspect.isabstract(attr):
classes[camel_to_snake_case(attr.__name__)] = attr
return classes
def is_class(klass):
return inspect.isclass(klass) and (not inspect.isabstract(klass))
def register_module(self, module, overwrite=False):
"""
Register all component classes within a module.
Parameters
----------
module : Union[str, `ModuleType`]
overwrite : bool
"""
if isinstance(module, basestring):
module = pydoc.locate(module)
if not inspect.ismodule(module):
raise ValueError('module must be either a module or the name of a '
'module')
self.logger.info('Registering components in module: {}'.format(
module.__name__))
registered = 0
for obj_name, class_obj in inspect.getmembers(module):
if (inspect.isclass(class_obj) and
class_obj is not Component and
not inspect.isabstract(class_obj) and
not issubclass(class_obj, SubGraph) and
issubclass(class_obj, Component)):
self.register_component(class_obj, overwrite)
registered += 1
if registered == 0:
self.logger.warn('No components were found in module: {}'.format(
module.__name__))
def test_abstractmethod_integration(self):
for abstractthing in [abc.abstractmethod, abc.abstractproperty,
abc.abstractclassmethod,
abc.abstractstaticmethod]:
class C(metaclass=abc.ABCMeta):
@abstractthing
def foo(self): pass # abstract
def bar(self): pass # concrete
self.assertEqual(C.__abstractmethods__, {"foo"})
self.assertRaises(TypeError, C) # because foo is abstract
self.assertTrue(isabstract(C))
class D(C):
def bar(self): pass # concrete override of concrete
self.assertEqual(D.__abstractmethods__, {"foo"})
self.assertRaises(TypeError, D) # because foo is still abstract
self.assertTrue(isabstract(D))
class E(D):
def foo(self): pass
self.assertEqual(E.__abstractmethods__, set())
E() # now foo is concrete, too
self.assertFalse(isabstract(E))
class F(E):
@abstractthing
def bar(self): pass # abstract override of concrete
self.assertEqual(F.__abstractmethods__, {"bar"})
self.assertRaises(TypeError, F) # because bar is abstract now
self.assertTrue(isabstract(F))
def _execute_backend_on_spec(self):
"""Renders a source file into its final form."""
api_no_aliases_cache = None
for attr_key in dir(self.backend_module):
attr_value = getattr(self.backend_module, attr_key)
if (inspect.isclass(attr_value) and
issubclass(attr_value, Backend) and
not inspect.isabstract(attr_value)):
self._logger.info('Running backend: %s', attr_value.__name__)
backend = attr_value(self.build_path, self.backend_args)
if backend.preserve_aliases:
api = self.api
else:
if not api_no_aliases_cache:
api_no_aliases_cache = remove_aliases_from_api(self.api)
api = api_no_aliases_cache
try:
backend.generate(api)
except Exception:
# Wrap this exception so that it isn't thought of as a bug
# in the stone parser, but rather a bug in the backend.
# Remove the last char of the traceback b/c it's a newline.
raise BackendException(
attr_value.__name__, traceback.format_exc()[:-1])
def _loader(self, cls: type):
# Forces unittest.TestLoader to return tests from abstract base classes as None.
if isabstract(cls):
return None
else:
return self.unpatched_loadTestsFromTestCase(cls)
def prefetch(session, discussion_id):
from assembl.lib.sqla import class_registry
from assembl.models import DiscussionBoundBase
for name, cls in class_registry.items():
if issubclass(cls, DiscussionBoundBase) and not isabstract(cls):
mapper = class_mapper(cls)
undefers = [undefer(attr.key) for attr in mapper.iterate_properties
if getattr(attr, 'deferred', False)]
conditions = cls.get_discussion_conditions(discussion_id)
session.query(with_polymorphic(cls, "*")).filter(
and_(*conditions)).options(*undefers).all()
def get_concrete_subclasses_recursive(c):
"""Recursively returns only the concrete classes is a class hierarchy"""
concreteSubclasses = []
subclasses = get_subclasses_recursive(c)
for d in subclasses:
if not inspect.isabstract(d):
concreteSubclasses.append(d)
return concreteSubclasses
def get_base_conditions(self, alias_maker, cls, for_graph):
from ..models import DiscussionBoundBase
conditions = super(
AssemblClassPatternExtractor, self).get_base_conditions(
alias_maker, cls, for_graph)
base_conds = cls.base_conditions(alias_maker=alias_maker)
if base_conds:
conditions.extend(base_conds)
if (for_graph.discussion_id and issubclass(cls, DiscussionBoundBase)
and not isabstract(cls)):
# TODO: update with conditionS.
conditions.extend(cls.get_discussion_conditions(
for_graph.discussion_id, alias_maker))
return [c for c in conditions if c is not None]
def delete_discussion(session, discussion_id):
from assembl.models import (
Base, Discussion, DiscussionBoundBase, Preferences, LangStringEntry)
# delete anything related first
classes = DiscussionBoundBase._decl_class_registry.values()
classes_by_table = defaultdict(list)
for cls in classes:
if isinstance(cls, type):
classes_by_table[getattr(cls, '__table__', None)].append(cls)
# Only direct subclass of abstract
def is_concrete_class(cls):
if isabstract(cls):
return False
for (i, cls) in enumerate(cls.mro()):
if not i:
continue
if not issubclass(cls, Base):
continue
return isabstract(cls)
concrete_classes = set([cls for cls in itertools.chain(
*list(classes_by_table.values()))
if issubclass(cls, DiscussionBoundBase) and
is_concrete_class(cls)])
concrete_classes.add(Preferences)
concrete_classes.add(LangStringEntry)
tables = DiscussionBoundBase.metadata.sorted_tables
# Special case for preferences
discussion = session.query(Discussion).get(discussion_id)
session.delete(discussion.preferences)
# tables.append(Preferences.__table__)
tables.reverse()
for table in tables:
if table not in classes_by_table:
continue
for cls in classes_by_table[table]:
if cls not in concrete_classes:
continue
print('deleting', cls.__name__)
query = session.query(cls.id)
if hasattr(cls, "get_discussion_conditions"):
conds = cls.get_discussion_conditions(discussion_id)
else:
continue
assert conds
cond = and_(*conds)
v = JoinColumnsVisitor(cls, query, classes_by_table)
v.traverse(cond)
query = v.final_query().filter(cond)
if query.count():
print("*" * 20, "Not all deleted!")
ids = query.all()
for subcls in cls.mro():
if getattr(subcls, '__tablename__', None):
session.query(subcls).filter(
subcls.id.in_(ids)).delete(False)
session.flush()