def strip_defaults(self):
"""
Returns self.parameters list with all default values stripped from right
side.
That means, if last parameter is default, it's removed from list; if
before-last parameter is default, it's removed as well; et cetera et
cetera until first non-default parameter is reached.
if as_strings is set to True, all parameters are converted to apropriate
strings (x.name for enums, x.encode('string_escape') for strings,
"""
argspec = inspect.getargspec(self.__class__.__init__)
required_count = len(argspec.args) - len(argspec.defaults) - 1
d = list(argspec.defaults)
l = list(self.parameters)
while len(d) and len(l) > required_count and d[-1] == l[-1]:
d, l = d[:-1], l[:-1]
return l
python类getargspec()的实例源码
def get_args(self, func):
"""
Get the arguments of a method and return it as a dictionary with the
supplied defaults, method arguments with no default are assigned None
"""
def reverse(iterable):
if iterable:
iterable = list(iterable)
while len(iterable):
yield iterable.pop()
args, varargs, varkw, defaults = inspect.getargspec(func)
result = {}
for default in reverse(defaults):
result[args.pop()] = default
for arg in reverse(args):
if arg == 'self':
continue
result[arg] = None
return result
def analyse_action(func):
"""Analyse a function."""
_deprecated()
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def analyse_action(func):
"""Analyse a function."""
_deprecated()
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def get_kwarg_names(func):
"""
Return a list of valid kwargs to function func
"""
try:
# Python 3.5
sig = inspect.signature(func)
except AttributeError:
# Below Python 3.5
args, _, _, defaults = inspect.getargspec(func)
if defaults:
kwonlyargs = args[-len(defaults):]
else:
kwonlyargs = []
else:
kwonlyargs = [p.name for p in sig.parameters.values()
if p.default is not p.empty]
return kwonlyargs
def quick_init(self, locals_):
if getattr(self, "_serializable_initialized", False):
return
if sys.version_info >= (3, 0):
spec = inspect.getfullargspec(self.__init__)
# Exclude the first "self" parameter
if spec.varkw:
kwargs = locals_[spec.varkw]
else:
kwargs = dict()
else:
spec = inspect.getargspec(self.__init__)
if spec.keywords:
kwargs = locals_[spec.keywords]
else:
kwargs = dict()
if spec.varargs:
varargs = locals_[spec.varargs]
else:
varargs = tuple()
in_order_args = [locals_[arg] for arg in spec.args][1:]
self.__args = tuple(in_order_args) + varargs
self.__kwargs = kwargs
setattr(self, "_serializable_initialized", True)
def __init__(self, func, role='func', doc=None, config={}):
self._f = func
self._role = role # e.g. "func" or "meth"
if doc is None:
if func is None:
raise ValueError("No function or docstring given")
doc = inspect.getdoc(func) or ''
NumpyDocString.__init__(self, doc)
if not self['Signature'] and func is not None:
func, func_name = self.get_func()
try:
# try to read signature
argspec = inspect.getargspec(func)
argspec = inspect.formatargspec(*argspec)
argspec = argspec.replace('*','\*')
signature = '%s%s' % (func_name, argspec)
except TypeError, e:
signature = '%s()' % func_name
self['Signature'] = signature
def __init__(self, docstring, class_name, class_object):
super(NumpyClassDocString, self).__init__(docstring)
self.class_name = class_name
methods = dict((name, func) for name, func
in inspect.getmembers(class_object))
self.has_parameters = False
if '__init__' in methods:
# verify if __init__ is a Python function. If it isn't
# (e.g. the function is implemented in C), getargspec will fail
if not inspect.ismethod(methods['__init__']):
return
args, varargs, keywords, defaults = inspect.getargspec(
methods['__init__'])
if (args and args != ['self']) or varargs or keywords or defaults:
self.has_parameters = True
def analyse_action(func):
"""Analyse a function."""
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def analyse_action(func):
"""Analyse a function."""
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def __init__(self, func, role='func', doc=None, config={}):
self._f = func
self._role = role # e.g. "func" or "meth"
if doc is None:
if func is None:
raise ValueError("No function or docstring given")
doc = inspect.getdoc(func) or ''
NumpyDocString.__init__(self, doc)
if not self['Signature'] and func is not None:
func, func_name = self.get_func()
try:
# try to read signature
argspec = inspect.getargspec(func)
argspec = inspect.formatargspec(*argspec)
argspec = argspec.replace('*', '\*')
signature = '%s%s' % (func_name, argspec)
except TypeError as e:
signature = '%s()' % func_name
self['Signature'] = signature
def filter_chain(filters, token, func, *args, **kwargs):
if token == -1:
return func()
else:
def _inner_method():
fm = filters[token]
fargs = getargspec(fm)[0]
if len(fargs) == 1:
# Only self arg
result = func()
if result is None:
return fm()
else:
raise IncorrectPluginArg(u'Plugin filter method need a arg to receive parent method result.')
else:
return fm(func if fargs[1] == '__' else func(), *args, **kwargs)
return filter_chain(filters, token - 1, _inner_method, *args, **kwargs)
def register_handler(
self, # type: BaseSocket
method # type: Callable[..., Union[bool, None]]
): # type: (...) -> None
"""Register a handler for incoming method.
Args:
method: A function with two given arguments. Its signature
should be of the form ``handler(msg, handler)``,
where msg is a :py:class:`py2p.base.Message`
object, and handler is a
:py:class:`py2p.base.BaseConnection` object. It
should return ``True`` if it performed an action,
to reduce the number of handlers checked.
Raises:
ValueError: If the method signature doesn't parse correctly
"""
args = inspect.getargspec(method)
if (args[1:] != (None, None, None) or
len(args[0]) != (3 if args[0][0] == 'self' else 2)):
raise ValueError(
"This method must contain exactly two arguments "
"(or three if first is self)")
self.__handlers.append(method)
def getargspec_init(method):
"""inspect.getargspec with considerations for typical __init__ methods
Wraps inspect.getargspec with error handling for typical __init__ cases::
object.__init__ -> (self)
other unreflectable (usually C) -> (self, *args, **kwargs)
"""
try:
return compat.inspect_getargspec(method)
except TypeError:
if method is object.__init__:
return (['self'], None, None, None)
else:
return (['self'], 'args', 'kwargs', None)
task_assign.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _create_body(task, task_class):
body = None
args_list = inspect.getargspec(task_class.__init__).args
len_args_list = len(args_list)
# Skip first parameter 'self' of the __init__ method which is a convention in Python for that method.
if len_args_list > 1:
body = str(getattr(task, args_list[1]))
if len_args_list > 2:
# Ordering of the arguments from the __init__ method is relevant!
# getattr throws an exception if an argument is not found in the task object.
for index in range(2, len(args_list)):
body += BaseMessage.field_separator + str(getattr(task, args_list[index]))
return body
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__','/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def analyse_action(func):
"""Analyse a function."""
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def mocked_method(path, name, count):
parent_method = getattr(NetworkDriver, name)
parent_method_args = inspect.getargspec(parent_method)
modifier = 0 if 'self' not in parent_method_args.args else 1
def _mocked_method(*args, **kwargs):
# Check len(args)
if len(args) + len(kwargs) + modifier > len(parent_method_args.args):
raise TypeError(
"{}: expected at most {} arguments, got {}".format(
name, len(parent_method_args.args), len(args) + modifier))
# Check kwargs
unexpected = [x for x in kwargs if x not in parent_method_args.args]
if unexpected:
raise TypeError("{} got an unexpected keyword argument '{}'".format(name,
unexpected[0]))
return mocked_data(path, name, count)
return _mocked_method
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__','/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def filter_chain(filters, token, func, *args, **kwargs):
if token == -1:
return func()
else:
def _inner_method():
fm = filters[token]
fargs = getargspec(fm)[0]
if len(fargs) == 1:
# Only self arg
result = func()
if result is None:
return fm()
else:
raise IncorrectPluginArg(u'Plugin filter method need a arg to receive parent method result.')
else:
return fm(func if fargs[1] == '__' else func(), *args, **kwargs)
return filter_chain(filters, token - 1, _inner_method, *args, **kwargs)