def add(self, categorize):
"""Add given method to categorize messages. When a message is received,
each of the added methods (most recently added method first) is called
with the message. The method should return a category (any hashable
object) or None (in which case next recently added method is called with
the same message). If all the methods return None for a given message,
the message is queued with category=None, so that 'receive' method here
works just as Task.receive.
"""
if inspect.isfunction(categorize):
argspec = inspect.getargspec(categorize)
if len(argspec.args) != 1:
categorize = None
elif type(categorize) != partial_func:
categorize = None
if categorize:
self._categorize.insert(0, categorize)
else:
logger.warning('invalid categorize function ignored')
python类getargspec()的实例源码
def run(self):
"Run cli, processing arguments and executing subcommands."
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
def describe_arguments(func):
"""
Analyze a function's signature and return a data structure suitable for
passing in as arguments to an argparse parser's add_argument() method."""
argspec = inspect.getargspec(func)
# we should probably raise an exception somewhere if func includes **kwargs
if argspec.defaults:
positional_args = argspec.args[:-len(argspec.defaults)]
keyword_names = argspec.args[-len(argspec.defaults):]
for arg, default in zip(keyword_names, argspec.defaults):
yield ('--{}'.format(arg),), {'default': default}
else:
positional_args = argspec.args
for arg in positional_args:
yield (arg,), {}
if argspec.varargs:
yield (argspec.varargs,), {'nargs': '*'}
def parse_argspec(callable_):
"""
Takes a callable and returns a tuple with the list of Argument objects,
the name of *args, and the name of **kwargs.
If *args or **kwargs is not present, it will be None.
This returns a namedtuple called Argspec that has three fields named:
args, starargs, and kwargs.
"""
args, varargs, keywords, defaults = inspect.getargspec(callable_)
defaults = list(defaults or [])
if getattr(callable_, '__self__', None) is not None:
# This is a bound method, drop the self param.
args = args[1:]
first_default = len(args) - len(defaults)
return Argspec(
[Argument(arg, Argument.no_default
if n < first_default else defaults[n - first_default])
for n, arg in enumerate(args)],
varargs,
keywords,
)
def getargspec(func):
params = signature(func).parameters
args, varargs, keywords, defaults = [], None, None, []
for name, param in params.items():
if param.kind == param.VAR_POSITIONAL:
varargs = name
elif param.kind == param.VAR_KEYWORD:
keywords = name
else:
args.append(name)
if param.default is not param.empty:
defaults.append(param.default)
return (args, varargs, keywords, tuple(defaults) or None)
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__', '/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def getargspec_init(method):
"""inspect.getargspec with considerations for typical __init__ methods
Wraps inspect.getargspec with error handling for typical __init__ cases::
object.__init__ -> (self)
other unreflectable (usually C) -> (self, *args, **kwargs)
"""
try:
return compat.inspect_getargspec(method)
except TypeError:
if method is object.__init__:
return (['self'], None, None, None)
else:
return (['self'], 'args', 'kwargs', None)
def analyse_action(func):
"""Analyse a function."""
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def enableAttributes(genfunc):
"""Wrapper for generators to enable classlike attribute access.
The generator definition should specify 'self' as the first parameter.
Calls to a wrapped generator should ignore the self parameter.
"""
old = getargspec(genfunc)
old[0].pop(0)
new = getargspec(genfunc)
new[0][0] = 'wrapped'
specs = {'name': genfunc.func_name,
'oldargs': formatargspec(*old),
'newargs': formatargspec(*new)}
exec(_redefinition % specs, genfunc.func_globals)
#### A minimal, complete example
def signature(function):
"""Build a string with source code of the function declaration"""
desc = inspect.getargspec(function)
if desc[3]:
ldefault = len(desc[3])
default = desc[3]
sign = ','.join(desc[0][:-ldefault])
else:
ldefault = 0
default=[]
sign = ','.join(desc[0])
for n,v in zip(desc[0][-ldefault:],default):
sign += ','+n+"="+str(v)
if desc[1]:
sign +=',*'+desc[1]
if desc[2]:
sign +=',**'+desc[2]
if sign and sign[0]==',': sign = sign[1:]
return sign
def describe_builtin(obj):
""" Describe a builtin function """
wi('+Built-in Function: %s' % obj.__name__)
# Built-in functions cannot be inspected by
# inspect.getargspec. We have to try and parse
# the __doc__ attribute of the function.
docstr = obj.__doc__
args = ''
if docstr:
items = docstr.split('\n')
if items:
func_descr = items[0]
s = func_descr.replace(obj.__name__,'')
idx1 = s.find('(')
idx2 = s.find(')',idx1)
if idx1 != -1 and idx2 != -1 and (idx2>idx1+1):
args = s[idx1+1:idx2]
wi('\t-Method Arguments:', args)
if args=='':
wi('\t-Method Arguments: None')
print
def run(self):
"Run cli, processing arguments and executing subcommands."
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
def describe_arguments(func):
"""
Analyze a function's signature and return a data structure suitable for
passing in as arguments to an argparse parser's add_argument() method."""
argspec = inspect.getargspec(func)
# we should probably raise an exception somewhere if func includes **kwargs
if argspec.defaults:
positional_args = argspec.args[:-len(argspec.defaults)]
keyword_names = argspec.args[-len(argspec.defaults):]
for arg, default in zip(keyword_names, argspec.defaults):
yield ('--{}'.format(arg),), {'default': default}
else:
positional_args = argspec.args
for arg in positional_args:
yield (arg,), {}
if argspec.varargs:
yield (argspec.varargs,), {'nargs': '*'}
def run(self):
"Run cli, processing arguments and executing subcommands."
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
def describe_arguments(func):
"""
Analyze a function's signature and return a data structure suitable for
passing in as arguments to an argparse parser's add_argument() method."""
argspec = inspect.getargspec(func)
# we should probably raise an exception somewhere if func includes **kwargs
if argspec.defaults:
positional_args = argspec.args[:-len(argspec.defaults)]
keyword_names = argspec.args[-len(argspec.defaults):]
for arg, default in zip(keyword_names, argspec.defaults):
yield ('--{}'.format(arg),), {'default': default}
else:
positional_args = argspec.args
for arg in positional_args:
yield (arg,), {}
if argspec.varargs:
yield (argspec.varargs,), {'nargs': '*'}
def run(self):
"Run cli, processing arguments and executing subcommands."
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
def run(self):
"Run cli, processing arguments and executing subcommands."
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
def describe_arguments(func):
"""
Analyze a function's signature and return a data structure suitable for
passing in as arguments to an argparse parser's add_argument() method."""
argspec = inspect.getargspec(func)
# we should probably raise an exception somewhere if func includes **kwargs
if argspec.defaults:
positional_args = argspec.args[:-len(argspec.defaults)]
keyword_names = argspec.args[-len(argspec.defaults):]
for arg, default in zip(keyword_names, argspec.defaults):
yield ('--{}'.format(arg),), {'default': default}
else:
positional_args = argspec.args
for arg in positional_args:
yield (arg,), {}
if argspec.varargs:
yield (argspec.varargs,), {'nargs': '*'}
def describe_arguments(func):
"""
Analyze a function's signature and return a data structure suitable for
passing in as arguments to an argparse parser's add_argument() method."""
argspec = inspect.getargspec(func)
# we should probably raise an exception somewhere if func includes **kwargs
if argspec.defaults:
positional_args = argspec.args[:-len(argspec.defaults)]
keyword_names = argspec.args[-len(argspec.defaults):]
for arg, default in zip(keyword_names, argspec.defaults):
yield ('--{}'.format(arg),), {'default': default}
else:
positional_args = argspec.args
for arg in positional_args:
yield (arg,), {}
if argspec.varargs:
yield (argspec.varargs,), {'nargs': '*'}
def _conspect_param_defaults(func_to, func_from):
# ..
result = {}
defaults = inspect.getargspec(func_from).defaults
if defaults:
# ..
args = inspect.getargspec(func_from).args
# ..
defaults_enum = list(enumerate(defaults))
defaults_length = len(defaults_enum)
result = dict((args[-defaults_length + id],value)
for id, value in defaults_enum)
# ...
setattr(func_to, JSON_ARGS_DEFAULTS, result)
# ...
# ...
# ...
def analyse_action(func):
"""Analyse a function."""
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def run(self):
"Run cli, processing arguments and executing subcommands."
arguments = self.argument_parser.parse_args()
argspec = inspect.getargspec(arguments.func)
vargs = []
for arg in argspec.args:
vargs.append(getattr(arguments, arg))
if argspec.varargs:
vargs.extend(getattr(arguments, argspec.varargs))
output = arguments.func(*vargs)
if getattr(arguments.func, '_cli_test_command', False):
self.exit_code = 0 if output else 1
output = ''
if getattr(arguments.func, '_cli_no_output', False):
output = ''
self.formatter.format_output(output, arguments.format)
if charmhelpers.core.unitdata._KV:
charmhelpers.core.unitdata._KV.flush()
def describe_arguments(func):
"""
Analyze a function's signature and return a data structure suitable for
passing in as arguments to an argparse parser's add_argument() method."""
argspec = inspect.getargspec(func)
# we should probably raise an exception somewhere if func includes **kwargs
if argspec.defaults:
positional_args = argspec.args[:-len(argspec.defaults)]
keyword_names = argspec.args[-len(argspec.defaults):]
for arg, default in zip(keyword_names, argspec.defaults):
yield ('--{}'.format(arg),), {'default': default}
else:
positional_args = argspec.args
for arg in positional_args:
yield (arg,), {}
if argspec.varargs:
yield (argspec.varargs,), {'nargs': '*'}
def item_triggered(item_name, event_types=None, result_item_name=None):
event_types = event_types or [ITEM_CHANGE]
event_bus = scope.events
if hasattr(event_types, '__iter__'):
event_types = ",".join(event_types)
def decorator(fn):
nargs = len(inspect.getargspec(fn).args)
def callback(module, inputs):
fn_args = []
event = inputs.get('event')
if event and nargs == 1:
fn_args.append(event)
result_value = fn(*fn_args)
if result_item_name:
event_bus.postUpdate(result_item_name, unicode(result_value))
rule = _FunctionRule(callback, [ItemEventTrigger(item_name, event_types)], extended=True)
get_automation_manager().addRule(rule)
return fn
return decorator
def item_group_triggered(group_name, event_types=None, result_item_name=None):
event_types = event_types or [ITEM_CHANGE]
event_bus = scope.events
if hasattr(event_types, '__iter__'):
event_types = ",".join(event_types)
def decorator(fn):
nargs = len(inspect.getargspec(fn).args)
def callback(module, inputs):
fn_args = []
event = inputs.get('event')
if event and nargs == 1:
fn_args.append(event)
result_value = fn(*fn_args)
if result_item_name:
event_bus.postUpdate(result_item_name, unicode(result_value))
group_triggers = []
group = scope.itemRegistry.getItem(group_name)
for i in group.getAllMembers():
group_triggers.append(ItemEventTrigger(i.name, event_types))
rule = _FunctionRule(callback, group_triggers, extended=True)
get_automation_manager().addRule(rule)
return fn
return decorator
def getargspec(func):
if six.PY2:
return inspect.getargspec(func)
sig = inspect.signature(func)
args = [
p.name for p in sig.parameters.values()
if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
]
varargs = [
p.name for p in sig.parameters.values()
if p.kind == inspect.Parameter.VAR_POSITIONAL
]
varargs = varargs[0] if varargs else None
varkw = [
p.name for p in sig.parameters.values()
if p.kind == inspect.Parameter.VAR_KEYWORD
]
varkw = varkw[0] if varkw else None
defaults = [
p.default for p in sig.parameters.values()
if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD and p.default is not p.empty
] or None
return args, varargs, varkw, defaults
def func_accepts_kwargs(func):
if six.PY2:
# Not all callables are inspectable with getargspec, so we'll
# try a couple different ways but in the end fall back on assuming
# it is -- we don't want to prevent registration of valid but weird
# callables.
try:
argspec = inspect.getargspec(func)
except TypeError:
try:
argspec = inspect.getargspec(func.__call__)
except (TypeError, AttributeError):
argspec = None
return not argspec or argspec[2] is not None
return any(
p for p in inspect.signature(func).parameters.values()
if p.kind == p.VAR_KEYWORD
)
def analyse_action(func):
"""Analyse a function."""
description = inspect.getdoc(func) or 'undocumented action'
arguments = []
args, varargs, kwargs, defaults = inspect.getargspec(func)
if varargs or kwargs:
raise TypeError('variable length arguments for action not allowed.')
if len(args) != len(defaults or ()):
raise TypeError('not all arguments have proper definitions')
for idx, (arg, definition) in enumerate(zip(args, defaults or ())):
if arg.startswith('_'):
raise TypeError('arguments may not start with an underscore')
if not isinstance(definition, tuple):
shortcut = None
default = definition
else:
shortcut, default = definition
argument_type = argument_types[type(default)]
if isinstance(default, bool) and default is True:
arg = 'no-' + arg
arguments.append((arg.replace('_', '-'), shortcut,
default, argument_type))
return func, description, arguments
def mergeFunctionMetadata(f, g):
"""
Overwrite C{g}'s docstring and name with values from C{f}. Update
C{g}'s instance dictionary with C{f}'s.
"""
try:
g.__doc__ = f.__doc__
except (TypeError, AttributeError):
pass
try:
g.__dict__.update(f.__dict__)
except (TypeError, AttributeError):
pass
try:
g.__name__ = f.__name__
except TypeError:
try:
g = new.function(
g.func_code, g.func_globals,
f.__name__, inspect.getargspec(g)[-1],
g.func_closure)
except TypeError:
pass
return g
def commit(self, request, node, data):
"""
It has been determined that the input for the entire form is completely
valid; it is now safe for all handlers to commit changes to the model.
"""
if self._commit is None:
data = str(data)
if data != self.view.getData():
self.model.setData(data)
self.model.notify({'request': request, self.submodel: data})
else:
func = self._commit
if hasattr(func, 'im_func'):
func = func.im_func
args, varargs, varkw, defaults = inspect.getargspec(func)
if args[1] == 'request':
self._commit(request, data)
else:
self._commit(data)