def struct_fields(value):
if isinstance(value, types.ClassType) or hasattr(value, '__bases__'):
clazz = value
else:
clazz = type(value)
# Try to get the correct field ordering, if it is available, otherwise
# just look at the class dictionary to find the fields.
fields = getattr(clazz, '__fields', None)
if fields is None:
fields = [p for p in clazz.__dict__.itervalues() if isinstance(p, simple_property)]
fields += [p for p in clazz.__dict__.itervalues() if isinstance(p, simpleseq_property)]
members = inspect.getmembers(value)
for member in members:
if isinstance(member[1], simple_property) or isinstance(member[1], simpleseq_property):
foundMatch = False
for field in fields:
if member[1].id_ == field.id_:
foundMatch = True
break
if not foundMatch:
fields += [member[1]]
return fields
python类ClassType()的实例源码
def runTests(self):
if self.catchbreak:
installHandler()
if self.testRunner is None:
self.testRunner = runner.TextTestRunner
if isinstance(self.testRunner, (type, types.ClassType)):
try:
testRunner = self.testRunner(verbosity=self.verbosity,
failfast=self.failfast,
buffer=self.buffer)
except TypeError:
# didn't accept the verbosity, buffer or failfast arguments
testRunner = self.testRunner()
else:
# it is assumed to be a TestRunner instance
testRunner = self.testRunner
self.result = testRunner.run(self.test)
if self.exit:
sys.exit(not self.result.wasSuccessful())
def runTests(self):
if self.catchbreak:
installHandler()
if self.testRunner is None:
self.testRunner = runner.TextTestRunner
if isinstance(self.testRunner, (type, types.ClassType)):
try:
testRunner = self.testRunner(verbosity=self.verbosity,
failfast=self.failfast,
buffer=self.buffer)
except TypeError:
# didn't accept the verbosity, buffer or failfast arguments
testRunner = self.testRunner()
else:
# it is assumed to be a TestRunner instance
testRunner = self.testRunner
self.result = testRunner.run(self.test)
if self.exit:
sys.exit(not self.result.wasSuccessful())
def latestVersionOf(self, anObject):
"""Get the latest version of an object.
This can handle just about anything callable; instances, functions,
methods, and classes.
"""
t = type(anObject)
if t == types.FunctionType:
return latestFunction(anObject)
elif t == types.MethodType:
if anObject.im_self is None:
return getattr(anObject.im_class, anObject.__name__)
else:
return getattr(anObject.im_self, anObject.__name__)
elif t == types.InstanceType:
# Kick it, if it's out of date.
getattr(anObject, 'nothing', None)
return anObject
elif t == types.ClassType:
return latestClass(anObject)
else:
log.msg('warning returning anObject!')
return anObject
def loadAnything(self, thing, recurse=False):
"""
Given a Python object, return whatever tests that are in it. Whatever
'in' might mean.
@param thing: A Python object. A module, method, class or package.
@param recurse: Whether or not to look in subpackages of packages.
Defaults to False.
@return: A C{TestCase} or C{TestSuite}.
"""
if isinstance(thing, types.ModuleType):
if isPackage(thing):
return self.loadPackage(thing, recurse)
return self.loadModule(thing)
elif isinstance(thing, types.ClassType):
return self.loadClass(thing)
elif isinstance(thing, type):
return self.loadClass(thing)
elif isinstance(thing, types.MethodType):
return self.loadMethod(thing)
raise TypeError("No loader for %r. Unrecognized type" % (thing,))
def minimalBases(classes):
"""Reduce a list of base classes to its ordered minimum equivalent"""
classes = [c for c in classes if c is not ClassType]
candidates = []
for m in classes:
for n in classes:
if issubclass(n,m) and m is not n:
break
else:
# m has no subclasses in 'classes'
if m in candidates:
candidates.remove(m) # ensure that we're later in the list
candidates.append(m)
return candidates
def get_formatted_content(self, pyobj):
tc = type(pyobj)
if isinstance(pyobj, object):
tc = pyobj.__class__
if hasattr(pyobj, 'typecode'):
#serializer = pyobj.typecode.serialmap.get(tc)
serializer = pyobj.typecode
else:
serializer = Any.serialmap.get(tc)
if not serializer:
tc = (types.ClassType, pyobj.__class__.__name__)
serializer = Any.serialmap.get(tc)
else:
serializer = Any.serialmap.get(tc)
if not serializer and isinstance(pyobj, time.struct_time):
from pysphere.ZSI.TCtimes import gDateTime
serializer = gDateTime()
if serializer:
return serializer.get_formatted_content(pyobj)
raise EvaluateException('Failed to find serializer for pyobj %s' %pyobj)
def load_component(self, class_name):
"""
Loads and adds new component to editor.
Returns component instance.
"""
if class_name in self.loaded_components:
return self.loaded_components[class_name]
mod = importlib.import_module("scc.gui.ae.%s" % (class_name,))
for x in mod.__all__:
cls = getattr(mod, x)
if isinstance(cls, (type, types.ClassType)) and issubclass(cls, AEComponent):
if cls is not AEComponent:
instance = cls(self.app, self)
self.loaded_components[class_name] = instance
self.components.append(instance)
return instance
def register(cls, action):
action_logger.info("Registering action :%s" % action)
if isinstance(action, (types.FunctionType, staticmethod)):
name = action.__name__
cls._actions[name.upper()] = action
setattr(cls, name, action)
elif isinstance(action, types.ClassType) and hasattr(action, "__call__"):
name = action.__name__
action = action()
cls._actions[name.upper()] = action
setattr(cls, name, action)
elif (isinstance(action, (types.InstanceType, types.ObjectType))
and hasattr(action, "__call__")):
if isinstance(action, type):
name = action.__name__
action = action()
else:
name = action.__class__.__name__
cls._actions[name.upper()] = action
setattr(cls, name, action)
else:
name = str(action)
action_logger.error("Error registering action :%s" % action)
raise UnknownAction("unable to register action %s!!" %name)
def register(action_klass):
"""
Registers a chaos action class with Action class
Args:
action_klass:
Returns:
"""
def _register(chaos_klass):
if chaos_klass.enabled:
name = chaos_klass.__name__
if isinstance(chaos_klass, (type, types.ClassType)):
chaos_klass = chaos_klass()
setattr(action_klass, name, chaos_klass)
action_klass._actions.update([(name.upper(), chaos_klass)])
return chaos_klass
return _register
def register(cls, event):
exe_logger.info("Registering event :%s" % event)
if isinstance(event, (types.FunctionType, staticmethod)):
name = event.__name__
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
elif isinstance(event, types.ClassType) and hasattr(event, "__call__"):
name = event.__name__
event = event()
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
elif (isinstance(event, (types.InstanceType, types.ObjectType))
and hasattr(event, "__call__")):
if isinstance(event, type):
name = event.__name__
event = event()
else:
name = event.__class__.__name__
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
else:
name = str(event)
exe_logger.error("Error registering event :%s" % event)
raise UnknownChaosEvent("unable to register event %s!!" % name)
def register(executor_klass):
"""
Registers a chaos event class with ChaosExecutor class
Args:
executor_klass:
Returns:
"""
def _register(chaos_klass):
if chaos_klass.enabled:
name = chaos_klass.__name__
if isinstance(chaos_klass, (type, types.ClassType)):
chaos_klass = chaos_klass()
setattr(executor_klass, name, chaos_klass)
# event_klass._chaos_events.update([(name.upper(), chaos_klass)])
executor_klass.update_events([(name.upper(), chaos_klass)])
return chaos_klass
return _register
def simplefilter(self, action, category=Warning, lineno=0, append=0, caller_id = 0, severity=0):
with self.warn_lock:
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, None, category, None, lineno, caller_id, severity)
if item in self.filters:
self.filters.remove(item)
if append:
self.filters.append(item)
else:
self.filters.insert(0, item)
def filterwarnings(self, action, message="", category=Warning, module="", lineno=0, append=0, caller_id = 0, severity=0):
with self.warn_lock:
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, basestring), "message must be a string"
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, basestring), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, re.compile(message, re.I), category,
re.compile(module), lineno, caller_id, severity)
if item in self.filters:
self.filters.remove(item)
if append:
self.filters.append(item)
else:
self.filters.insert(0, item)
# end _Warnings()
def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(%r, *%r, **%r)',
self, fn, args, kwargs)
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
else:
klass = None
recv = self.send_async(
mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION,
)
)
recv.raise_channelerror = False
return recv
def addDataSource(self, source=SampleDataSource, name='default', sourceModule=None, sourceArgs=[], sourceKwargs={}):
if name in self._sources:
raise Exception('Data source "%s" already exists!' % name)
if utils.isstr(source):
if utils.isstr(sourceModule):
sourceModule = __import__(sourceModule)
if sourceModule is not None:
source = sourceModule.__dict__[source]
elif type(source) in [ types.ClassType, types.TypeType]:
source = source(*sourceArgs, **sourceKwargs)
cds = weakref.WeakSet()
self._sources[name] = (source, cds)
self._lastSamples[name] = source.initialSamples()
for cd in list(self._lostCurveDatas):
if self._tryAddToDataSource(cd, name):
self._lostCurveDatas.remote(cds)
def skip(reason):
"""Unconditionally skip a test."""
import types
def decorator(test_item):
#if not isinstance(test_item, (type, types.ClassType)):
# @functools.wraps(test_item)
# def skip_wrapper(*args, **kwargs):
# raise SkipTest(reason)
# test_item = skip_wrapper
#
#test_item.__unittest_skip__ = True
#test_item.__unittest_skip_why__ = reason
#return test_item
# In older version of Python, tracking skipped tests is
# problematic since the test loader and runner handle this
# internally. For this reason, this compatibility wrapper
# simply wraps skipped tests in a functoin that passes
# silently:
@functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
pass
return skip_wrapper
return decorator
def validate_type(arg_name, arg, valid_types):
if long_exists:
if valid_types is int:
valid_types = (int, long)
elif type(valid_types) is list and int in valid_types and long not in valid_types:
valid_types.append(long)
if type(valid_types) is list:
valid_types = tuple(valid_types)
if (type(valid_types) is type or # single type, not array of types
type(valid_types) is tuple or # several valid types as tuple
type(valid_types) is types.ClassType): # old style class
if isinstance(arg, valid_types):
return
raise STLTypeError(arg_name, type(arg), valid_types)
else:
raise STLError('validate_type: valid_types should be type or list or tuple of types')
# throws STLError if not exactly one argument is present
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def loadTestsFromName(self, name, module=None):
"""Return a suite of all tests cases given a string specifier.
The name may resolve either to a module, a test case class, a
test method within a test case class, or a callable object which
returns a TestCase or TestSuite instance.
The method optionally resolves the names relative to a given module.
"""
parent, obj = self._resolveName(name, module)
if type(obj) == types.ModuleType:
return self.loadTestsFromModule(obj)
elif (isinstance(obj, (type, types.ClassType)) and
issubclass(obj, unittest.TestCase)):
return self.loadTestsFromTestCase(obj)
elif (type(obj) == types.UnboundMethodType and
isinstance(parent, (type, types.ClassType)) and
issubclass(parent, unittest.TestCase)):
return self.loadSpecificTestFromTestCase(parent, obj.__name__)
elif isinstance(obj, unittest.TestSuite):
return obj
elif hasattr(obj, '__call__'):
test = obj()
if isinstance(test, unittest.TestSuite):
return test
elif isinstance(test, unittest.TestCase):
return self.suiteClass([test])
else:
raise TypeError("calling %s returned %s, not a test" %
(obj, test))
else:
raise TypeError("don't know how to make test from: %s" % obj)
def __init__(self,
id_,
structdef,
name=None,
mode="readwrite",
configurationkind=("configure","property"),
description=None,
fget=None,
fset=None,
fval=None):
"""Construct a property of type struct.
@param structdef the structure definition, which is any class with simple_property attributes
"""
# struct properties have been extended to support multiple kinds,
# similar to simple and simplesequence. For backwards compatibility,
# convert string values into tuples.
if isinstance(configurationkind, str):
configurationkind = (configurationkind,)
_property.__init__(self, id_, None, name, None, mode, "external", configurationkind, description, fget, fset, fval)
if type(structdef) is types.ClassType:
raise ValueError("structdef must be a new-style python class (i.e. inherits from object)")
self.structdef = structdef
self.fields = {} # Map field id's to attribute names
for name, attr in self.structdef.__dict__.items():
if type(attr) is simple_property:
self.fields[attr.id_] = (name, attr)
elif type(attr) is simpleseq_property:
self.fields[attr.id_] = (name, attr)
def __init__(self,
id_,
structdef,
name=None,
defvalue=None,
mode="readwrite",
configurationkind=("configure","property"),
description=None,
fget=None,
fset=None,
fval=None):
# structsequence properties have been extended to support multiple
# kinds, similar to simple and simplesequence. For backwards
# compatibility, convert string values into tuples.
if isinstance(configurationkind, str):
configurationkind = (configurationkind,)
_sequence_property.__init__(self, id_, None, name, None, mode, "external", configurationkind, description, fget, fset, fval)
if type(structdef) is types.ClassType:
raise ValueError("structdef must be a new-style python class (i.e. inherits from object)")
self.structdef = structdef
self.fields = {} # Map field id's to attribute names
for name, attr in self.structdef.__dict__.items():
if type(attr) is simple_property:
self.fields[attr.id_] = (name, attr)
elif type(attr) is simpleseq_property:
self.fields[attr.id_] = (name, attr)
self.defvalue = defvalue
def str_to_class(s):
if s in globals() and isinstance(globals()[s], types.ClassType):
return globals()[s]
return None
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=0):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, basestring), "message must be a string"
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, basestring), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, re.compile(message, re.I), category,
re.compile(module), lineno)
if append:
filters.append(item)
else:
filters.insert(0, item)
def build_opener(*handlers):
"""Create an opener object from a list of handlers.
The opener will use several default handlers, including support
for HTTP, FTP and when applicable, HTTPS.
If any of the handlers passed as arguments are subclasses of the
default handlers, the default handlers will not be used.
"""
import types
def isclass(obj):
return isinstance(obj, (types.ClassType, type))
opener = OpenerDirector()
default_classes = [ProxyHandler, UnknownHandler, HTTPHandler,
HTTPDefaultErrorHandler, HTTPRedirectHandler,
FTPHandler, FileHandler, HTTPErrorProcessor]
if hasattr(httplib, 'HTTPS'):
default_classes.append(HTTPSHandler)
skip = set()
for klass in default_classes:
for check in handlers:
if isclass(check):
if issubclass(check, klass):
skip.add(klass)
elif isinstance(check, klass):
skip.add(klass)
for klass in skip:
default_classes.remove(klass)
for klass in default_classes:
opener.add_handler(klass())
for h in handlers:
if isclass(h):
h = h()
opener.add_handler(h)
return opener
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def __str__(self):
exc = self.exc
if type(exc) is types.ClassType:
exc = exc.__name__
return 'problem in %s - %s: %s' % (self.filename, exc, self.value)
def register(cls, subclass):
"""Register a virtual subclass of an ABC."""
if not isinstance(subclass, (type, types.ClassType)):
raise TypeError("Can only register classes")
if issubclass(subclass, cls):
return # Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
if issubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raise RuntimeError("Refusing to create an inheritance cycle")
cls._abc_registry.add(subclass)
ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict((k, config[k]) for k in config if valid_ident(k))
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result