def __init__(self, test):
if not callable(test):
raise ValueError("Guard test has to be callable")
# check if test signature is ok
test_args = _getparams(test)
if len(tuple(arg for arg in test_args
if arg[1] in (p_kind.POSITIONAL_ONLY, p_kind.POSITIONAL_OR_KEYWORD))) != 1:
raise ValueError("Guard test has to have only one positional (not varying) argument")
def apply_try_conv_to_bool(function, arg):
try:
return bool(function(arg)) # test must always return boolean, so convert asap.
except TypeError: # occures when unorderable types are compared, but we don't want TypeError to be raised.
return False # couldn't compare? then guard must say no.
self.test = lambda inp: apply_try_conv_to_bool(test, inp)
python类signature()的实例源码
function_pattern_matching.py 文件源码
项目:function-pattern-matching
作者: rasguanabana
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
function_pattern_matching.py 文件源码
项目:function-pattern-matching
作者: rasguanabana
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def __init__(self, test):
if not callable(test):
raise ValueError("Relguard test has to be callable")
# extract simplified signature
test_args = _getparams(test)
# varying args are not allowed, they make no sense in reguards.
_kinds = {x[1] for x in test_args}
if (p_kind.POSITIONAL_ONLY in _kinds or
p_kind.VAR_POSITIONAL in _kinds or
p_kind.VAR_KEYWORD in _kinds):
raise ValueError("Relguard test must take only named not varying arguments")
self.test = test
self.__argnames__ = {x[0] for x in test_args}
def getargspec(func):
params = signature(func).parameters
args, varargs, keywords, defaults = [], None, None, []
for name, param in params.items():
if param.kind == param.VAR_POSITIONAL:
varargs = name
elif param.kind == param.VAR_KEYWORD:
keywords = name
else:
args.append(name)
if param.default is not param.empty:
defaults.append(param.default)
return (args, varargs, keywords, tuple(defaults) or None)
def get_cookie(self, key, default=None, secret=None, digestmod=hashlib.sha256):
""" Return the content of a cookie. To read a `Signed Cookie`, the
`secret` must match the one used to create the cookie (see
:meth:`BaseResponse.set_cookie`). If anything goes wrong (missing
cookie or wrong signature), return a default value. """
value = self.cookies.get(key)
if secret:
# See BaseResponse.set_cookie for details on signed cookies.
if value and value.startswith('!') and '?' in value:
sig, msg = map(tob, value[1:].split('?', 1))
hash = hmac.new(tob(secret), msg, digestmod=digestmod).digest()
if _lscmp(sig, base64.b64encode(hash)):
dst = pickle.loads(base64.b64decode(msg))
if dst and dst[0] == key:
return dst[1]
return default
return value or default
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__', '/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def __init__(self, strat):
self.log = logging.getLogger('strategy.StrategyIterator({})'.format(str(strat)))
self.strategy = strat
sig = inspect.signature(strat.generate)
params = sig.parameters
kws = {}
self.log.debug('init')
for kw in params:
if kw in strat._kws:
self.log.debug('add keyword {kw}'.format(kw=kw))
kws[kw] = strat._kws[kw]
elif params[kw].kind == inspect.Parameter.VAR_KEYWORD:
self.log.debug('merge keywords on VAR_KEYWORD {kw}'.format(kw=kw))
kws.update(strat._kws)
break
self._generator = strat.generate(strat._depth, *strat._args, **kws)
def test_parameters_passed_custom_kwargs(self):
params = inspect.signature(lp.figure).parameters
with patch('matplotlib.figure.Figure.set_size_inches'), \
patch('latexipy._latexipy.save_figure') as mock_save_figure:
with lp.figure('filename', directory='directory', exts='exts',
mkdir='mkdir'):
pass
mock_save_figure.assert_called_once_with(
filename='filename',
directory='directory',
exts='exts',
mkdir='mkdir',
from_context_manager=True,
)
def get_connector_param(self,avail_name):
"""
Look up parameters for a initializing a connector.
"""
try:
c = self._avail_connectors[avail_name]
except KeyError:
err = "connector {} not found\n".format(avail_name)
raise ValueError(err)
meta = {}
parameters = inspect.signature(c).parameters
for k in parameters:
meta[k] = parameters.default
if meta[k] == inspect._empty:
meta[k] = None
return meta
def auto_assign(func):
# Signature:
sig = signature(func)
for name, param in sig.parameters.items():
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
raise RuntimeError('Unable to auto assign if *args or **kwargs in signature.')
# Wrapper:
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
for i, (name, param) in enumerate(sig.parameters.items()):
# Skip 'self' param:
if i == 0: continue
# Search value in args, kwargs or defaults:
if i - 1 < len(args):
val = args[i - 1]
elif name in kwargs:
val = kwargs[name]
else:
val = param.default
setattr(self, name, val)
func(self, *args, **kwargs)
return wrapper
def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)
def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)
def getargspec(func):
if six.PY2:
return inspect.getargspec(func)
sig = inspect.signature(func)
args = [
p.name for p in sig.parameters.values()
if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
]
varargs = [
p.name for p in sig.parameters.values()
if p.kind == inspect.Parameter.VAR_POSITIONAL
]
varargs = varargs[0] if varargs else None
varkw = [
p.name for p in sig.parameters.values()
if p.kind == inspect.Parameter.VAR_KEYWORD
]
varkw = varkw[0] if varkw else None
defaults = [
p.default for p in sig.parameters.values()
if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD and p.default is not p.empty
] or None
return args, varargs, varkw, defaults
def func_accepts_kwargs(func):
if six.PY2:
# Not all callables are inspectable with getargspec, so we'll
# try a couple different ways but in the end fall back on assuming
# it is -- we don't want to prevent registration of valid but weird
# callables.
try:
argspec = inspect.getargspec(func)
except TypeError:
try:
argspec = inspect.getargspec(func.__call__)
except (TypeError, AttributeError):
argspec = None
return not argspec or argspec[2] is not None
return any(
p for p in inspect.signature(func).parameters.values()
if p.kind == p.VAR_KEYWORD
)
def _call_matcher(self, _call):
"""
Given a call (or simply a (args, kwargs) tuple), return a
comparison key suitable for matching with other calls.
This is a best effort method which relies on the spec's signature,
if available, or falls back on the arguments themselves.
"""
sig = self._spec_signature
if sig is not None:
if len(_call) == 2:
name = ''
args, kwargs = _call
else:
name, args, kwargs = _call
try:
return name, sig.bind(*args, **kwargs)
except TypeError as e:
e.__traceback__ = None
return e
else:
return _call
def get_kwarg_names(func):
"""
Return a list of valid kwargs to function func
"""
try:
# Python 3.5
sig = inspect.signature(func)
except AttributeError:
# Below Python 3.5
args, _, _, defaults = inspect.getargspec(func)
if defaults:
kwonlyargs = args[-len(defaults):]
else:
kwonlyargs = []
else:
kwonlyargs = [p.name for p in sig.parameters.values()
if p.default is not p.empty]
return kwonlyargs
def _load_callback(callback_name):
"""
Load a callback function by name and check its form (must have one parameter named "entity").
:param callback_name: Name of the callback function to load.
:return: The callback function.
"""
try:
callback_function = getattr(callbacks, callback_name)
callback_parameters = signature(callback_function).parameters
# check if callback has the correct form (only one parameter named "entity")
if len(callback_parameters) == 1 and "entity" in callback_parameters:
return callback_function
else:
raise IllegalArgumentError("Invalid callback: " + str(callback_name))
except AttributeError:
raise IllegalConfigurationError("Parsing configuration file failed: Callback "
+ callback_name + " not found.")
def add_command(self, command, permission):
if command in self:
raise CommandException('Cannot re-assign command ({command_name})'.format(command_name=command))
def decorator(method):
## Retrieves the functions argument count.
## Used to verify the client/say command is valid.
self[command] = (
method,
len(signature(method).parameters),
permission
)
def new(*args):
method(*args)
return new
return decorator
def register_handler(
self, # type: BaseSocket
method # type: Callable[..., Union[bool, None]]
): # type: (...) -> None
"""Register a handler for incoming method.
Args:
method: A function with two given arguments. Its signature
should be of the form ``handler(msg, handler)``,
where msg is a :py:class:`py2p.base.Message`
object, and handler is a
:py:class:`py2p.base.BaseConnection` object. It
should return ``True`` if it performed an action,
to reduce the number of handlers checked.
Raises:
ValueError: If the method signature doesn't parse correctly
"""
args = inspect.signature(method)
if (len(args.parameters) !=
(3 if args.parameters.get('self') else 2)):
raise ValueError(
"This method must contain exactly two arguments "
"(or three if first is self)")
self.__handlers.append(method)
def register_handler(
self, # type: BaseSocket
method # type: Callable[..., Union[bool, None]]
): # type: (...) -> None
"""Register a handler for incoming method.
Args:
method: A function with two given arguments. Its signature
should be of the form ``handler(msg, handler)``,
where msg is a :py:class:`py2p.base.Message`
object, and handler is a
:py:class:`py2p.base.BaseConnection` object. It
should return ``True`` if it performed an action,
to reduce the number of handlers checked.
Raises:
ValueError: If the method signature doesn't parse correctly
"""
args = inspect.getargspec(method)
if (args[1:] != (None, None, None) or
len(args[0]) != (3 if args[0][0] == 'self' else 2)):
raise ValueError(
"This method must contain exactly two arguments "
"(or three if first is self)")
self.__handlers.append(method)
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__', '/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def _register(command, func, kwargs={}):
"""Register func as a handler for the given command."""
pattern = Pattern(command)
sig = inspect.signature(func)
func_argnames = set(sig.parameters)
when_argnames = set(pattern.argnames) | set(kwargs.keys())
if func_argnames != when_argnames:
raise InvalidCommand(
'The function %s%s has the wrong signature for @when(%r)' % (
func.__name__, sig, command
) + '\n\nThe function arguments should be (%s)' % (
', '.join(pattern.argnames + list(kwargs.keys()))
)
)
commands.append((pattern, func, kwargs))
def get_func_standard_and_necessary_keys(func):
"""????function?????.
:param func: ????
:return: standard_keys:??????????
necessary_keys: ???????????????
"""
standard_kwargs = dict(inspect.signature(func).parameters)
necessary_keys = []
for k, v in standard_kwargs.items():
format_string = v.__str__()
if "=" in format_string:
standard_kwargs[k] = format_string.split("=")[-1]
else:
standard_kwargs[k] = None
necessary_keys.append(k)
return set(standard_kwargs.keys()), set(necessary_keys)
def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)
def _call_matcher(self, _call):
"""
Given a call (or simply a (args, kwargs) tuple), return a
comparison key suitable for matching with other calls.
This is a best effort method which relies on the spec's signature,
if available, or falls back on the arguments themselves.
"""
sig = self._spec_signature
if sig is not None:
if len(_call) == 2:
name = ''
args, kwargs = _call
else:
name, args, kwargs = _call
try:
return name, sig.bind(*args, **kwargs)
except TypeError as e:
e.__traceback__ = None
return e
else:
return _call
def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)
def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__', '/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__', '/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def __init__(self, uri, uri_regex, handler):
super(HandlerDef, self).__init__()
self.uri = uri
self.uri_regex = uri_regex
self.handler = handler
self._signature = inspect.signature(handler)
self._params = {
k: v for k, v in list(
self._signature.parameters.items()
)[1:]
}
self.path_args = []
self.query_args = []
self.consumes = getattr(handler, 'consumes', None)
self.produces = getattr(handler, 'produces', None)
self.errors = getattr(handler, 'errors', [])
self.deprecated = getattr(handler, 'deprecated', False)
self._extract_arguments()
self.operation_definition = self._generate_operation_definition()
def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)