def register(cls, action):
action_logger.info("Registering action :%s" % action)
if isinstance(action, (types.FunctionType, staticmethod)):
name = action.__name__
cls._actions[name.upper()] = action
setattr(cls, name, action)
elif isinstance(action, types.ClassType) and hasattr(action, "__call__"):
name = action.__name__
action = action()
cls._actions[name.upper()] = action
setattr(cls, name, action)
elif (isinstance(action, (types.InstanceType, types.ObjectType))
and hasattr(action, "__call__")):
if isinstance(action, type):
name = action.__name__
action = action()
else:
name = action.__class__.__name__
cls._actions[name.upper()] = action
setattr(cls, name, action)
else:
name = str(action)
action_logger.error("Error registering action :%s" % action)
raise UnknownAction("unable to register action %s!!" %name)
python类ObjectType()的实例源码
def register(cls, event):
exe_logger.info("Registering event :%s" % event)
if isinstance(event, (types.FunctionType, staticmethod)):
name = event.__name__
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
elif isinstance(event, types.ClassType) and hasattr(event, "__call__"):
name = event.__name__
event = event()
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
elif (isinstance(event, (types.InstanceType, types.ObjectType))
and hasattr(event, "__call__")):
if isinstance(event, type):
name = event.__name__
event = event()
else:
name = event.__class__.__name__
cls._chaos_events[name.upper()] = event
setattr(cls, name, event)
else:
name = str(event)
exe_logger.error("Error registering event :%s" % event)
raise UnknownChaosEvent("unable to register event %s!!" % name)
def _get_func_repr(func):
if isinstance(func, types.MethodType):
return "{cls}.{func}".format(
cls=func.im_self.__class__,
func=func.im_func.__name__
)
elif isinstance(func, types.BuiltinMethodType):
if not func.__self__:
return "{func}".format(
func=func.__name__
)
else:
return "{type}.{func}".format(
type=func.__self__,
func=func.__name__
)
elif (isinstance(func, types.ObjectType) and hasattr(func, "__call__")) or\
isinstance(func, (types.FunctionType, types.BuiltinFunctionType,
types.ClassType, types.UnboundMethodType)):
return "{module}.{func}".format(
module=func.__module__,
func=func.__name__
)
else:
raise ValueError("func must be callable")
def _get_func_repr(func):
if isinstance(func, types.MethodType):
return "{cls}.{func}".format(
cls=func.im_self.__class__,
func=func.im_func.__name__
)
elif isinstance(func, types.BuiltinMethodType):
if not func.__self__:
return "{func}".format(
func=func.__name__
)
else:
return "{type}.{func}".format(
type=func.__self__,
func=func.__name__
)
elif (isinstance(func, types.ObjectType) and hasattr(func, "__call__")) or\
isinstance(func, (types.FunctionType, types.BuiltinFunctionType,
types.ClassType, types.UnboundMethodType)):
return "{module}.{func}".format(
module=func.__module__,
func=func.__name__
)
else:
raise ValueError("func must be callable")
def __call__(self, action, *args, **kwargs):
_action = self._actions.get(action)
if _action:
action_logger.debug("Calling action %s on %s" % (action, self.ce))
if isinstance(_action, (types.InstanceType,
types.ObjectType, staticmethod)):
return _action(self.ce, *args, **kwargs)
elif isinstance(_action, types.FunctionType):
return _action(self, *args, **kwargs)
else:
raise UnknownAction(action)
else:
action_logger.error("Unknown action requested :%s" % action)
raise UnknownAction(action)
def __call__(self, event, *args, **kwargs):
_chaos_event = self._chaos_events.get(event)
if _chaos_event:
exe_logger.debug("Calling event %s on %s" % (event, self.app))
if isinstance(_chaos_event, (types.InstanceType,
types.ObjectType, staticmethod)):
return _chaos_event(self.app, *args, **kwargs)
elif isinstance(_chaos_event, types.FunctionType):
return _chaos_event(self, *args, **kwargs)
else:
raise UnknownChaosEvent(event)
else:
exe_logger.error("Unknown event requested :%s" % event)
raise UnknownChaosEvent(event)
def source_read(app, docname, source):
'''Transform the contents of plugins-toolkit.rst to contain reference docs.
'''
# We're only interested in the 'plugins-toolkit' doc (plugins-toolkit.rst).
if docname != 'extensions/plugins-toolkit':
return
source_ = ''
for name, thing in inspect.getmembers(toolkit):
# The plugins toolkit can override the docstrings of some of its
# members (e.g. things that are imported from third-party libraries)
# by putting custom docstrings in this docstring_overrides dict.
custom_docstring = toolkit.docstring_overrides.get(name)
if inspect.isfunction(thing):
source_ += format_function(name, thing, docstring=custom_docstring)
elif inspect.ismethod(thing):
# We document plugins toolkit methods as if they're functions. This
# is correct because the class ckan.plugins.toolkit._Toolkit
# actually masquerades as a module ckan.plugins.toolkit, and you
# call its methods as if they were functions.
source_ += format_function(name, thing, docstring=custom_docstring)
elif inspect.isclass(thing):
source_ += format_class(name, thing, docstring=custom_docstring)
elif isinstance(thing, types.ObjectType):
source_ += format_object(name, thing, docstring=custom_docstring)
else:
assert False, ("Someone added {name}:{thing} to the plugins "
"toolkit and this Sphinx extension doesn't know "
"how to document that yet. If you're that someone, "
"you need to add a new format_*() function for it "
"here or the docs won't build.".format(
name=name, thing=thing))
source[0] += source_
# This is useful for debugging the generated RST.
#open('/tmp/source', 'w').write(source[0])
def __add_command(cls, key):
path = (cls.__module__, cls.__name__, cls.frontend.__class__.__name__)
t = cls.__type(cls.cache[key])
if t == types.FunctionType:
scr = functools.partial("command script add -f {:s}.{:s} {:s}".format, '.'.join(path), key)
elif t == types.ObjectType and key in cls.__synchronicity__:
scr = functools.partial("command script add -c {:s}.{:s} -s {:s} {:s}".format, '.'.join(path), key, cls.__synchronicity__[key])
elif t == types.ObjectType and key not in cls.__synchronicity__:
scr = functools.partial("command script add -c {:s}.{:s} {:s}".format, '.'.join(path), key)
else:
raise TypeError('{:s}.{:s} : Unable to generate command with unknown type. {!r}'.format(cls.__module__, cls.__name__, t))
return scr
converters.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def Instance2Str(o, d):
"""
Convert an Instance to a string representation. If the __str__()
method produces acceptable output, then you don't need to add the
class to conversions; it will be handled by the default
converter. If the exact class is not found in d, it will use the
first class it can find for which o is an instance.
"""
if d.has_key(o.__class__):
return d[o.__class__](o, d)
cl = filter(lambda x,o=o:
type(x) is types.ClassType
and isinstance(o, x), d.keys())
if not cl and hasattr(types, 'ObjectType'):
cl = filter(lambda x,o=o:
type(x) is types.TypeType
and isinstance(o, x)
and d[x] is not Instance2Str,
d.keys())
if not cl:
return d[types.StringType](o,d)
d[o.__class__] = d[cl[0]]
return d[cl[0]](o, d)
def __add_command(cls, key):
path = (cls.__module__, cls.__name__, cls.frontend.__class__.__name__)
t = cls.__type(cls.cache[key])
if t == types.FunctionType:
scr = functools.partial("command script add -f {:s}.{:s} {:s}".format, '.'.join(path), key)
elif t == types.ObjectType and key in cls.__synchronicity__:
scr = functools.partial("command script add -c {:s}.{:s} -s {:s} {:s}".format, '.'.join(path), key, cls.__synchronicity__[key])
elif t == types.ObjectType and key not in cls.__synchronicity__:
scr = functools.partial("command script add -c {:s}.{:s} {:s}".format, '.'.join(path), key)
else:
raise TypeError('{:s}.{:s} : Unable to generate command with unknown type. {!r}'.format(cls.__module__, cls.__name__, t))
return scr
def _get_func_repr(func):
"""
Gets a string representing a function object
"""
if isinstance(func, types.MethodType):
return "{cls}.{func}".format(
cls=func.im_self.__class__,
func=func.im_func.__name__
)
elif isinstance(func, types.BuiltinMethodType):
if not func.__self__:
return "{func}".format(
func=func.__name__
)
else:
return "{type}.{func}".format(
type=func.__self__,
func=func.__name__
)
elif (isinstance(func, types.ObjectType) and hasattr(func, "__call__")) or\
isinstance(func, (types.FunctionType, types.BuiltinFunctionType,
types.ClassType, types.UnboundMethodType)):
return "{module}.{func}".format(
module=func.__module__,
func=func.__name__
)
else:
raise ValueError("func must be callable")
def _curry_callable(obj, *args, **kwargs):
"""Takes a callable and arguments and returns a task queue tuple.
The returned tuple consists of (callable, args, kwargs), and can be pickled
and unpickled safely.
Args:
obj: The callable to curry. See the module docstring for restrictions.
args: Positional arguments to call the callable with.
kwargs: Keyword arguments to call the callable with.
Returns:
A tuple consisting of (callable, args, kwargs) that can be evaluated by
run() with equivalent effect of executing the function directly.
Raises:
ValueError: If the passed in object is not of a valid callable type.
"""
if isinstance(obj, types.MethodType):
return (invoke_member, (obj.im_self, obj.im_func.__name__) + args, kwargs)
elif isinstance(obj, types.BuiltinMethodType):
if not obj.__self__:
return (obj, args, kwargs)
else:
return (invoke_member, (obj.__self__, obj.__name__) + args, kwargs)
elif isinstance(obj, types.ObjectType) and hasattr(obj, "__call__"):
return (obj, args, kwargs)
elif isinstance(obj, (types.FunctionType, types.BuiltinFunctionType,
types.ClassType, types.UnboundMethodType)):
return (obj, args, kwargs)
else:
raise ValueError("obj must be callable")
def register_dao_class(database_type, cls):
if not issubclass(cls, BaseDAO):
raise ValueError('Class %r isn\'t a subclass of BaseDAO')
base_classes = inspect.getmro(cls)
for base_cls in inspect.getmro(cls):
if base_classes[0] != base_cls and base_cls not in (BaseDAO, types.ObjectType):
DAO_CLASS_BY_DATABASE_TYPE_BASE_CLASS[database_type][base_cls.__name__] = cls
def addService(self, service, name=None, description=None,
authenticator=None, expose_request=None, preprocessor=None):
"""
Adds a service to the gateway.
@param service: The service to add to the gateway.
@type service: C{callable}, class instance, or a module
@param name: The name of the service.
@type name: C{str}
@raise pyamf.remoting.RemotingError: Service already exists.
@raise TypeError: C{service} cannot be a scalar value.
@raise TypeError: C{service} must be C{callable} or a module.
"""
if isinstance(service, (int, long, float, basestring)):
raise TypeError("Service cannot be a scalar value")
allowed_types = (
types.ModuleType,
types.FunctionType,
types.DictType,
types.MethodType,
types.InstanceType,
types.ObjectType,
)
if not python.callable(service) and \
not isinstance(service, allowed_types):
raise TypeError("Service must be a callable, module, or an object")
if name is None:
# TODO: include the module in the name
if isinstance(service, (type, types.ClassType)):
name = service.__name__
elif isinstance(service, types.FunctionType):
name = service.func_name
elif isinstance(service, types.ModuleType):
name = service.__name__
else:
name = str(service)
if name in self.services:
raise remoting.RemotingError("Service %s already exists" % name)
self.services[name] = ServiceWrapper(
service,
description,
authenticator,
expose_request,
preprocessor
)