def inject_function(f):
full_arg_spec = inspect.getfullargspec(f)
args = full_arg_spec.args
annotations = full_arg_spec.annotations
def _inner(self):
container = Container()
objects = {}
for arg in args:
if arg in ('self',):
continue
try:
obj_type = annotations[arg]
except KeyError:
obj_type = arg
obj = container.get_object(obj_type)
objects[arg] = obj
setattr(self, arg, obj)
return f(self, **objects)
return _inner
python类getfullargspec()的实例源码
def typecheck(func):
if not hasattr(func,'__annotations__'): return method
import inspect
argspec = inspect.getfullargspec(func)
def check( t, T ):
if type(T) == type: return isinstance(t,T) #types
else: return T(t) #predicates
def wrapper(*args):
if len(argspec.args) != len(args):
raise TypeError( "%s() takes exactly %s positional argument (%s given)"
%(func.__name__,len(argspec.args),len(args)) )
for argname,t in zip(argspec.args, args):
if argname in func.__annotations__:
T = func.__annotations__[argname]
if not check( t, T ):
raise TypeError( "%s( %s:%s ) but received %s=%s"
% (func.__name__, argname,T, argname,repr(t)) )
r = func(*args)
if 'return' in func.__annotations__:
T = func.__annotations__['return']
if not check( r, T ):
raise TypeError( "%s() -> %s but returned %s"%(func.__name__,T,repr(r)) )
return r
return wrapper
def quick_init(self, locals_):
if getattr(self, "_serializable_initialized", False):
return
if sys.version_info >= (3, 0):
spec = inspect.getfullargspec(self.__init__)
# Exclude the first "self" parameter
if spec.varkw:
kwargs = locals_[spec.varkw]
else:
kwargs = dict()
else:
spec = inspect.getargspec(self.__init__)
if spec.keywords:
kwargs = locals_[spec.keywords]
else:
kwargs = dict()
if spec.varargs:
varargs = locals_[spec.varargs]
else:
varargs = tuple()
in_order_args = [locals_[arg] for arg in spec.args][1:]
self.__args = tuple(in_order_args) + varargs
self.__kwargs = kwargs
setattr(self, "_serializable_initialized", True)
def curry(func):
"""
Return the curried version of a function.
"""
spec = inspect.getfullargspec(func)
if spec.varargs or spec.varkw or spec.kwonlyargs:
raise TypeError('cannot curry a variadic function')
def incomplete_factory(arity, used_args):
return lambda *args: (
func(*(used_args + args))
if len(used_args) + len(args) >= arity
else incomplete_factory(arity, used_args + args)
)
return incomplete_factory(len(spec.args), ())
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def __init__(self, func, role='func', doc=None, config={}):
self._f = func
self._role = role # e.g. "func" or "meth"
if doc is None:
if func is None:
raise ValueError("No function or docstring given")
doc = inspect.getdoc(func) or ''
NumpyDocString.__init__(self, doc)
if not self['Signature'] and func is not None:
func, func_name = self.get_func()
try:
# try to read signature
if sys.version_info[0] >= 3:
argspec = inspect.getfullargspec(func)
else:
argspec = inspect.getargspec(func)
argspec = inspect.formatargspec(*argspec)
argspec = argspec.replace('*', '\*')
signature = '%s%s' % (func_name, argspec)
except TypeError as e:
signature = '%s()' % func_name
self['Signature'] = signature
def quick_init(self, locals_):
if getattr(self, "_serializable_initialized", False):
return
if sys.version_info >= (3, 0):
spec = inspect.getfullargspec(self.__init__)
# Exclude the first "self" parameter
if spec.varkw:
kwargs = locals_[spec.varkw]
else:
kwargs = dict()
else:
spec = inspect.getargspec(self.__init__)
if spec.keywords:
kwargs = locals_[spec.keywords]
else:
kwargs = dict()
if spec.varargs:
varargs = locals_[spec.varargs]
else:
varargs = tuple()
in_order_args = [locals_[arg] for arg in spec.args][1:]
self.__args = tuple(in_order_args) + varargs
self.__kwargs = kwargs
setattr(self, "_serializable_initialized", True)
def clone(cls, obj, **kwargs):
assert isinstance(obj, Serializable)
d = obj.__getstate__()
# Split the entries in kwargs between positional and keyword arguments
# and update d['__args'] and d['__kwargs'], respectively.
if sys.version_info >= (3, 0):
spec = inspect.getfullargspec(obj.__init__)
else:
spec = inspect.getargspec(obj.__init__)
in_order_args = spec.args[1:]
d["__args"] = list(d["__args"])
for kw, val in kwargs.items():
if kw in in_order_args:
d["__args"][in_order_args.index(kw)] = val
else:
d["__kwargs"][kw] = val
out = type(obj).__new__(type(obj))
out.__setstate__(d)
return out
def call_converter(converter:callable, predecessors:dict) -> dict:
"""Return the result obtained by calling the converter with the
given predecessors {type: value} as input.
"""
argspec = inspect.getfullargspec(converter)
all_args = tuple(argspec.args + argspec.kwonlyargs)
nb_args = len(all_args)
# if number of (positional) args equals the number of predecessor
if (nb_args == len(predecessors) == 1) or (len(argspec.args) == len(predecessors) == 1):
params = {all_args[0]: next(iter(predecessors.values()))}
else: # let's use args name
params = {arg: predecessors[arg] for arg in all_args if arg in predecessors}
if len(params) < len(predecessors): # let's use annotations
# map predecessor type -> arg, inferred from annotations, minus already matching args
matching_annotations = {argspec.annotations[arg]: arg for arg in all_args
if argspec.annotations.get(arg) in predecessors
and arg not in params}
params.update({
arg: predecessors[pred]
for pred, arg in matching_annotations.items()
})
return converter(**params)
def __init__(self, func, role='func', doc=None, config={}):
self._f = func
self._role = role # e.g. "func" or "meth"
if doc is None:
if func is None:
raise ValueError("No function or docstring given")
doc = inspect.getdoc(func) or ''
NumpyDocString.__init__(self, doc)
if not self['Signature'] and func is not None:
func, func_name = self.get_func()
try:
# try to read signature
if sys.version_info[0] >= 3:
argspec = inspect.getfullargspec(func)
else:
argspec = inspect.getargspec(func)
argspec = inspect.formatargspec(*argspec)
argspec = argspec.replace('*','\*')
signature = '%s%s' % (func_name, argspec)
except TypeError as e:
signature = '%s()' % func_name
self['Signature'] = signature
def add_event(self, loop, instance, fn):
"""
Wraps the given function in a corutine which calls it every N seconds
:param loop: The event loop (cls._loop)
:param instance: The instantiated class
:param fn: The function to be called
:return: None
"""
# Verify function signature
argspec = inspect.getfullargspec(fn)
n_args = len(argspec.args) - 1 if 'self' in argspec.args else len(argspec.args)
n_required_args = n_args - (len(argspec.defaults) if argspec.defaults else 0)
assert n_required_args == 0, 'Functions decorated with @Periodic cannot have any required parameters.'
@asyncio.coroutine
def periodic_fn():
# If coroutine yield from else call normally
is_coroutine = asyncio.iscoroutinefunction(fn)
while True:
last_exec = time.time()
(yield from fn()) if is_coroutine else fn()
yield from asyncio.sleep(max(0, self.time + last_exec - time.time()))
super().add_event(loop, instance, periodic_fn())
def __call__(self, fn, *args, **kwargs):
try:
fn._decorators.append(self)
except AttributeError:
fn._decorators = [self]
# Verify function signature
argspec = inspect.getfullargspec(fn)
n_args = len(argspec.args) - 1 if 'self' in argspec.args else len(argspec.args)
assert n_args >= 1, 'Functions decorated with @Subscribe must have a parameter for the message.'
n_required_args = n_args - (len(argspec.defaults) if argspec.defaults else 0)
assert n_required_args <= 1, 'Functions decorated with @Subscribe can only have one required parameter.'
# Add typing information if not already defined
first_arg = argspec.args[1] if 'self' in argspec.args else argspec.args[0]
if first_arg not in argspec.annotations.keys():
fn.__annotations__[first_arg] = self.subs[-1]
return fn
def _get_arg_lengths(func):
"""Returns a two-tuple containing the number of positional arguments
as the first item and the number of variable positional arguments as
the second item.
"""
try:
funcsig = inspect.signature(func)
params_dict = funcsig.parameters
parameters = params_dict.values()
args_type = (inspect._POSITIONAL_OR_KEYWORD, inspect._POSITIONAL_ONLY)
args = [x for x in parameters if x.kind in args_type]
vararg = [x for x in parameters if x.kind == inspect._VAR_POSITIONAL]
vararg = vararg.pop() if vararg else None
except AttributeError:
try:
try: # For Python 3.2 and earlier.
args, vararg = inspect.getfullargspec(func)[:2]
except AttributeError: # For Python 2.7 and earlier.
args, vararg = inspect.getargspec(func)[:2]
except TypeError: # In 3.2 and earlier, raises TypeError
raise ValueError # but 3.3 and later raise a ValueError.
return (len(args), (1 if vararg else 0))
def assertFullArgSpecEquals(self, routine, args_e, varargs_e=None,
varkw_e=None, defaults_e=None,
kwonlyargs_e=[], kwonlydefaults_e=None,
ann_e={}, formatted=None):
args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann = \
inspect.getfullargspec(routine)
self.assertEqual(args, args_e)
self.assertEqual(varargs, varargs_e)
self.assertEqual(varkw, varkw_e)
self.assertEqual(defaults, defaults_e)
self.assertEqual(kwonlyargs, kwonlyargs_e)
self.assertEqual(kwonlydefaults, kwonlydefaults_e)
self.assertEqual(ann, ann_e)
if formatted is not None:
self.assertEqual(inspect.formatargspec(args, varargs, varkw, defaults,
kwonlyargs, kwonlydefaults, ann),
formatted)
def __init__(
self,
fun: Callable[..., Any],
name: str=None,
nargs: Union[str, int]=None,
min: int=None,
max: int=None,
**kw: str
) -> None:
self._fun = fun
self._argspec = inspect.getfullargspec(fun)
self._params = List.wrap(self._argspec.args)
self._param_count = self._params.length - (1 if self._params.head.contains('self') else 0)
self._name = Maybe(name)
self._nargs = Maybe(to_int(nargs))
self._min = Maybe(min)
self._max = Maybe(max)
self._kw = Map(kw)
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def check_session_aspect(func, *args, **kwargs):
"""
The authentication aspect code, provides an aspect for the aop_check_session decorator
"""
# Get a list of the names of the non-keyword arguments
_argument_specifications = getfullargspec(func)
try:
# Try and find _session_id
arg_idx = _argument_specifications.args.index("_session_id")
except:
raise Exception("Authentication aspect for \"" + func.__name__ + "\": No _session_id parameter")
# Check if the session is valid.
_user = check_session(args[arg_idx])
# Call the function and set the _user parameter, if existing.
return alter_function_parameter_and_call(func, args, kwargs, '_user', _user)
def alter_function_parameter_and_call(function_object, args, kwargs, name, value, err_if_not_set=None):
"""Changes or sets the value of an argument, if present, and calls the function.
:param function_object: An instance of the called function
:param args: A list of arguments
:param kwargs: A dict of keyword arguments
:param name: Name of the parameter that should be set
:param value: Value to set that parameter to
:param err_if_not_set: If is set, raise as error if the argument isn't present.
:return: The return value of the called function
"""
argspec = getfullargspec(function_object)
if name in argspec.args:
arg_idx = argspec.args.index(name)
largs = list(args)
largs.pop(arg_idx)
largs.insert(arg_idx, value)
new_args = tuple(largs)
return function_object(*new_args, **kwargs)
elif err_if_not_set is not None:
raise Exception(err_if_not_set)
return function_object(args, kwargs)
def __init__(self, func, role='func', doc=None, config={}):
self._f = func
self._role = role # e.g. "func" or "meth"
if doc is None:
if func is None:
raise ValueError("No function or docstring given")
doc = inspect.getdoc(func) or ''
NumpyDocString.__init__(self, doc)
if not self['Signature'] and func is not None:
func, func_name = self.get_func()
try:
# try to read signature
if sys.version_info[0] >= 3:
argspec = inspect.getfullargspec(func)
else:
argspec = inspect.getargspec(func)
argspec = inspect.formatargspec(*argspec)
argspec = argspec.replace('*', '\*')
signature = '%s%s' % (func_name, argspec)
except TypeError as e:
signature = '%s()' % func_name
self['Signature'] = signature
def __get_result(method, params, original_msg):
func_args = getattr(getfullargspec(method), keywords_args)
if func_args and "kwargs" in func_args:
if isinstance(params, list):
result = method(*params, original_message=original_msg)
else:
result = method(original_message=original_msg, **params)
else:
if isinstance(params, list):
result = method(*params)
else:
result = method(**params)
return result
def assertFullArgSpecEquals(self, routine, args_e, varargs_e=None,
varkw_e=None, defaults_e=None,
kwonlyargs_e=[], kwonlydefaults_e=None,
ann_e={}, formatted=None):
args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann = \
inspect.getfullargspec(routine)
self.assertEqual(args, args_e)
self.assertEqual(varargs, varargs_e)
self.assertEqual(varkw, varkw_e)
self.assertEqual(defaults, defaults_e)
self.assertEqual(kwonlyargs, kwonlyargs_e)
self.assertEqual(kwonlydefaults, kwonlydefaults_e)
self.assertEqual(ann, ann_e)
if formatted is not None:
self.assertEqual(inspect.formatargspec(args, varargs, varkw, defaults,
kwonlyargs, kwonlydefaults, ann),
formatted)
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def getfullargspec(func):
"""Compatibility function to provide inspect.getfullargspec in Python 2
This should be rewritten using a backport of Python 3 signature
once we drop support for Python 2.6. We went for a simpler
approach at the time of writing because signature uses OrderedDict
which is not available in Python 2.6.
"""
try:
return inspect.getfullargspec(func)
except AttributeError:
arg_spec = inspect.getargspec(func)
import collections
tuple_fields = ('args varargs varkw defaults kwonlyargs '
'kwonlydefaults annotations')
tuple_type = collections.namedtuple('FullArgSpec', tuple_fields)
return tuple_type(args=arg_spec.args,
varargs=arg_spec.varargs,
varkw=arg_spec.keywords,
defaults=arg_spec.defaults,
kwonlyargs=[],
kwonlydefaults=None,
annotations={})
def signatures_match(cls, orig, stubbed, ignore_self=False):
if six.PY2:
orig_arguments = inspect.getargspec(orig)
stub_arguments = inspect.getargspec(stubbed)
else:
orig_arguments = inspect.getfullargspec(orig)
stub_arguments = inspect.getfullargspec(stubbed)
if ignore_self:
if 'self' in orig_arguments.args: orig_arguments.args.remove('self')
if 'self' in stub_arguments.args: stub_arguments.args.remove('self')
assert orig_arguments == stub_arguments, \
'signature mismatch: %s%s does not match %s%s' % \
(stubbed, inspect.formatargspec(*stub_arguments),
orig, inspect.formatargspec(*orig_arguments))
return False
#------------------------------------------------[ Impostor ]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def assertFullArgSpecEquals(self, routine, args_e, varargs_e=None,
varkw_e=None, defaults_e=None,
kwonlyargs_e=[], kwonlydefaults_e=None,
ann_e={}, formatted=None):
args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann = \
inspect.getfullargspec(routine)
self.assertEqual(args, args_e)
self.assertEqual(varargs, varargs_e)
self.assertEqual(varkw, varkw_e)
self.assertEqual(defaults, defaults_e)
self.assertEqual(kwonlyargs, kwonlyargs_e)
self.assertEqual(kwonlydefaults, kwonlydefaults_e)
self.assertEqual(ann, ann_e)
if formatted is not None:
self.assertEqual(inspect.formatargspec(args, varargs, varkw, defaults,
kwonlyargs, kwonlydefaults, ann),
formatted)
def schedule_handler(cls: Any, obj: Any, context: Dict, func: Any, interval: Optional[Union[str, int]]=None, timestamp: Optional[str]=None, timezone: Optional[str]=None) -> Any:
async def handler() -> None:
values = inspect.getfullargspec(func)
kwargs = {k: values.defaults[i] for i, k in enumerate(values.args[len(values.args) - len(values.defaults):])} if values.defaults else {}
routine = func(*(obj,), **kwargs)
try:
if isinstance(routine, Awaitable):
await routine
except Exception as e:
pass
context['_schedule_scheduled_functions'] = context.get('_schedule_scheduled_functions', [])
context['_schedule_scheduled_functions'].append((interval, timestamp, timezone, func, handler))
start_func = cls.start_scheduler(cls, obj, context)
return (await start_func) if start_func else None
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def command(arguments: List[ParserElement]):
"""
command is a decorator which examines the function name and its arguments
and registers `<function_name> arguments...` as part of the grammar.
"""
def decorator(fn):
fn_args = inspect.getfullargspec(fn).args
_name = '.' + fn.__name__
_arguments = Literal(_name)
for i, arg in enumerate(arguments):
_arguments += arg.setResultsName(fn_args[i])
global grammar
grammar |= _arguments
def wrapper(parsed: Dict, event, event_user, event_channel, slack_client):
return fn(**parsed, event=event, event_user=event_user, event_channel=event_channel, slack_client=slack_client)
mapping[_name] = wrapper
return wrapper
return decorator