def verify_interface(iface, klass):
for method in iface.__abstractmethods__:
if not hasattr(klass, method):
raise InterfaceNotImplemented(
"{0} is missing a {1!r} method".format(klass, method)
)
if isinstance(getattr(iface, method), abc.abstractproperty):
# Can't properly verify these yet.
continue
sig = signature(getattr(iface, method))
actual = signature(getattr(klass, method))
if sig != actual:
raise InterfaceNotImplemented(
"{0}.{1}'s signature differs from the expected. Expected: "
"{2!r}. Received: {3!r}".format(
klass, method, sig, actual
)
)
python类signature()的实例源码
def yieldroutes(func):
""" Return a generator for routes that match the signature (name, args)
of the func parameter. This may yield more than one route if the function
takes optional keyword arguments. The output is best described by example::
a() -> '/a'
b(x, y) -> '/b/<x>/<y>'
c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
"""
path = '/' + func.__name__.replace('__', '/').lstrip('/')
spec = getargspec(func)
argc = len(spec[0]) - len(spec[3] or [])
path += ('/<%s>' * argc) % tuple(spec[0][:argc])
yield path
for arg in spec[0][argc:]:
path += '/<%s>' % arg
yield path
def auto_assign(func):
signature = inspect.signature(func)
def wrapper(*args, **kwargs):
instance = args[0]
bind = signature.bind(*args, **kwargs)
for param in signature.parameters.values():
if param.name != 'self':
if param.name in bind.arguments:
setattr(instance, param.name, bind.arguments[param.name])
if param.name not in bind.arguments and param.default is not param.empty:
setattr(instance, param.name, param.default)
return func(*args, **kwargs)
wrapper.__signature__ = signature # Restore the signature
return wrapper
def get_signature(obj, method_name):
method = getattr(obj, method_name)
# Eat self for unbound methods bc signature doesn't do it
if PY3:
if (inspect.isclass(obj) and
not inspect.ismethod(method) and
not isinstance(
obj.__dict__.get(method_name),
staticmethod)):
method = functools.partial(method, None)
else:
if (isinstance(method, types.UnboundMethodType) and
method.__self__ is None):
method = functools.partial(method, None)
try:
return signature(method)
except:
return None
def run(self, arguments, settings, app):
aiotask_context.set('request', self.request)
script = os.path.abspath(arguments.script)
spec = importlib.util.spec_from_file_location("module.name", script)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if not hasattr(module, 'run'):
logger.warn(f'Not `async def run()` function found in file {script}')
return
sig = inspect.signature(module.run)
if 'container' in sig.parameters:
async for txn, tm, container in get_containers(self.request):
await module.run(container)
await tm.commit(txn=txn)
else:
await lazy_apply(module.run, app)
def test_gen_mutation(mock_person):
import inspect
import graphene
from graphene.utils.str_converters import to_snake_case
from graphene_mongo.mutation import gen_mutation
from graphene_mongo.model import ModelSchema
model_schema = ModelSchema(mock_person, mock_person._fields, None, None)
result = gen_mutation(mock_person, model_schema.schema, model_schema.operators_mutation,
model_schema.fields_mutation, None, None)
assert issubclass(result, graphene.Mutation)
assert hasattr(result, 'mutate')
assert result._meta.name == 'Create' + mock_person.__name__
assert result._meta.local_fields[to_snake_case(mock_person.__name__)]
assert result._meta.fields[to_snake_case(mock_person.__name__)]
operators_mutation = inspect.signature(result.__dict__['Field']).parameters['args'].default
assert operators_mutation == model_schema.operators_mutation
def __init__(self, clsname, bases, clsdict):
super().__init__(clsname, bases, clsdict)
sup = super(self, self)
for name, value in clsdict.items():
if name.startswith('_') or not callable(value):
continue
# Get the previous definition (if any) and compare the signatures
prev_dfn = getattr(sup,name,None)
if prev_dfn:
prev_sig = signature(prev_dfn)
val_sig = signature(value)
if prev_sig != val_sig:
logging.warning('Signature mismatch in %s. %s != %s',
value.__qualname__, str(prev_sig), str(val_sig))
# Example
def register(self, meth):
'''
Register a new method as a multimethod
'''
sig = inspect.signature(meth)
# Build a type-signature from the method's annotations
types = []
for name, parm in sig.parameters.items():
if name == 'self':
continue
if parm.annotation is inspect.Parameter.empty:
raise TypeError(
'Argument {} must be annotated with a type'.format(name)
)
if not isinstance(parm.annotation, type):
raise TypeError(
'Argument {} annotation must be a type'.format(name)
)
if parm.default is not inspect.Parameter.empty:
self._methods[tuple(types)] = meth
types.append(parm.annotation)
self._methods[tuple(types)] = meth
def typeassert(*ty_args, **ty_kwargs):
def decorate(func):
# If in optimized mode, disable type checking
if not __debug__:
return func
# Map function argument names to supplied types
sig = signature(func)
bound_types = sig.bind_partial(*ty_args, **ty_kwargs).arguments
@wraps(func)
def wrapper(*args, **kwargs):
bound_values = sig.bind(*args, **kwargs)
# Enforce type assertions across supplied arguments
for name, value in bound_values.arguments.items():
if name in bound_types:
if not isinstance(value, bound_types[name]):
raise TypeError(
'Argument {} must be {}'.format(name, bound_types[name])
)
return func(*args, **kwargs)
return wrapper
return decorate
# Examples
def pdef(self, obj, oname=''):
"""Print the call signature for any callable object.
If the object is a class, print the constructor information."""
if not callable(obj):
print('Object is not callable.')
return
header = ''
if inspect.isclass(obj):
header = self.__head('Class constructor information:\n')
output = self._getdef(obj,oname)
if output is None:
self.noinfo('definition header',oname)
else:
print(header,self.format(output), end=' ')
# In Python 3, all classes are new-style, so they all have __init__.
def _default_arguments_from_docstring(self, doc):
"""Parse the first line of docstring for call signature.
Docstring should be of the form 'min(iterable[, key=func])\n'.
It can also parse cython docstring of the form
'Minuit.migrad(self, int ncall=10000, resume=True, int nsplit=1)'.
"""
if doc is None:
return []
#care only the firstline
line = doc.lstrip().splitlines()[0]
#p = re.compile(r'^[\w|\s.]+\(([^)]*)\).*')
#'min(iterable[, key=func])\n' -> 'iterable[, key=func]'
sig = self.docstring_sig_re.search(line)
if sig is None:
return []
# iterable[, key=func]' -> ['iterable[' ,' key=func]']
sig = sig.groups()[0].split(',')
ret = []
for s in sig:
#re.compile(r'[\s|\[]*(\w+)(?:\s*=\s*.*)')
ret += self.docstring_kwd_re.findall(s)
return ret
interfaces_annotations.py 文件源码
项目:Expert-Python-Programming_Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def ensure_interface(function):
signature = inspect.signature(function)
parameters = signature.parameters
@wraps(function)
def wrapped(*args, **kwargs):
bound = signature.bind(*args, **kwargs)
for name, value in bound.arguments.items():
annotation = parameters[name].annotation
if not isinstance(annotation, ABCMeta):
continue
if not isinstance(value, annotation):
raise TypeError(
"{} does not implement {} interface"
"".format(value, annotation)
)
function(*args, **kwargs)
return wrapped
def _get_arg_lengths(func):
"""Returns a two-tuple containing the number of positional arguments
as the first item and the number of variable positional arguments as
the second item.
"""
try:
funcsig = inspect.signature(func)
params_dict = funcsig.parameters
parameters = params_dict.values()
args_type = (inspect._POSITIONAL_OR_KEYWORD, inspect._POSITIONAL_ONLY)
args = [x for x in parameters if x.kind in args_type]
vararg = [x for x in parameters if x.kind == inspect._VAR_POSITIONAL]
vararg = vararg.pop() if vararg else None
except AttributeError:
try:
try: # For Python 3.2 and earlier.
args, vararg = inspect.getfullargspec(func)[:2]
except AttributeError: # For Python 2.7 and earlier.
args, vararg = inspect.getargspec(func)[:2]
except TypeError: # In 3.2 and earlier, raises TypeError
raise ValueError # but 3.3 and later raise a ValueError.
return (len(args), (1 if vararg else 0))
def test_extended_headers_smoke(header_name):
header = getattr(ca, header_name)
sig = inspect.signature(header)
regular_bind_args = {}
extended_bind_args = {}
for param in sig.parameters.keys():
regular_bind_args[param] = 0
extended_bind_args[param] = 2 ** 32
reg_args = sig.bind(**regular_bind_args)
reg_hdr = header(*reg_args.args, **reg_args.kwargs)
ext_args = sig.bind(**extended_bind_args)
ext_hdr = header(*ext_args.args, **ext_args.kwargs)
print(reg_hdr)
assert isinstance(reg_hdr, ca.MessageHeader)
print(ext_hdr)
assert isinstance(ext_hdr, ca.ExtendedMessageHeader)
def __init__(self, func):
self._func = func
self.__signature__ = signature(func)
update_wrapper(self, func)
def next(self, f):
return Command(self.fdo, self.fpre, self.fpost, f, self.name)
# these are helper functions to the type signature of the `fdo` function
def signature(self):
s = inspect.signature(self.fdo)
return s
def return_annotation(self):
r = self.signature.return_annotation
if r is not inspect._empty:
return r
return None
def parameters(self):
s = self.signature
return s.parameters
def _get_workflow_from_task(task):
"""
Looks for an instance of `Workflow` linked to the task or a method of
`Workflow` among the done callbacks of the asyncio task. Returns None if
not found.
If the task was triggered from within a workflow it MUST have a `workflow`
attribute that points to it and at least one done callback that is a method
of `Workflow`.
"""
if isinstance(task, TukioTask):
workflow = task.workflow
if workflow:
return workflow
for cb in task._callbacks:
# inspect.getcallargs() gives access to the implicit 'self' arg of
# the bound method but it is marked as deprecated since
# Python 3.5.1 and the new `inspect.Signature` object does NOT do
# the job :((
if inspect.ismethod(cb):
inst = cb.__self__
elif isinstance(cb, functools.partial):
try:
inst = cb.func.__self__
except AttributeError:
continue
else:
continue
if isinstance(inst, Workflow):
return inst
return None
def make_sig(*names):
parms = [Parameter(name, Parameter.POSITIONAL_OR_KEYWORD)
for name in names]
return Signature(parms)
def make_sig(*names):
parms = [Parameter(name, Parameter.POSITIONAL_OR_KEYWORD)
for name in names]
return Signature(parms)
def __init__(
self,
module: str,
qualname: str,
kind: FunctionKind,
sig: inspect.Signature,
is_async: bool = False
) -> None:
self.module = module
self.qualname = qualname
self.kind = kind
self.signature = sig
self.is_async = is_async
def from_callable(cls, func: Callable, kind: FunctionKind = None) -> 'FunctionDefinition':
kind = FunctionKind.from_callable(func)
sig = inspect.Signature.from_callable(func)
is_async = asyncio.iscoroutinefunction(func)
return FunctionDefinition(func.__module__, func.__qualname__, kind, sig, is_async)
def get_imports_for_annotation(anno: Any) -> ImportMap:
"""Return the imports (module, name) needed for the type in the annotation"""
imports = ImportMap()
if (
anno is inspect.Parameter.empty or
anno is inspect.Signature.empty or
not isinstance(anno, (type, _Any, _Union)) or
anno.__module__ == 'builtins'
):
return imports
if isinstance(anno, _Any):
imports['typing'].add('Any')
elif _is_optional(anno):
imports['typing'].add('Optional')
elem_type = _get_optional_elem(anno)
elem_imports = get_imports_for_annotation(elem_type)
imports.merge(elem_imports)
elif isinstance(anno, (_Union, GenericMeta)):
if isinstance(anno, _Union):
imports['typing'].add('Union')
else:
name = _get_import_for_qualname(anno.__qualname__)
imports[anno.__module__].add(name)
elem_types = anno.__args__ or []
for et in elem_types:
elem_imports = get_imports_for_annotation(et)
imports.merge(elem_imports)
else:
name = _get_import_for_qualname(anno.__qualname__)
imports[anno.__module__].add(name)
return imports
def get_imports_for_signature(sig: inspect.Signature) -> ImportMap:
"""Return the imports (module, name) needed for all types in annotations"""
imports = ImportMap()
for param in sig.parameters.values():
param_imports = get_imports_for_annotation(param.annotation)
imports.merge(param_imports)
return_imports = get_imports_for_annotation(sig.return_annotation)
imports.merge(return_imports)
return imports
def update_signature_args(sig: inspect.Signature, arg_types: Dict[str, type], has_self: bool) -> inspect.Signature:
"""Update argument annotations with the supplied types"""
params = []
for arg_idx, name in enumerate(sig.parameters):
param = sig.parameters[name]
typ = arg_types.get(name)
# Don't touch pre-existing annotations and leave self un-annotated
if (typ is not None) and \
(param.annotation is inspect.Parameter.empty) and \
((not has_self) or (arg_idx != 0)):
param = param.replace(annotation=typ)
params.append(param)
return sig.replace(parameters=params)
def has_unparsable_defaults(sig: inspect.Signature) -> bool:
"""Return whether or not the reprs for all defaults in the signature are valid python expressions"""
for param in sig.parameters.values():
if param.default is inspect.Parameter.empty:
continue
try:
parser.expr(repr(param.default))
except SyntaxError:
return True
return False
def __init__(
self,
name: str,
signature: inspect.Signature,
kind: FunctionKind,
strip_modules: Iterable[str] = None,
is_async: bool = False
) -> None:
self.name = name
self.signature = signature
self.kind = kind
self.strip_modules = strip_modules or []
self.is_async = is_async
def signature_without_unbound_args(func):
sig = inspect.signature(func)
params = [param for param in sig.parameters.values()
if param.kind not in (inspect.Parameter.VAR_POSITIONAL,
inspect.Parameter.VAR_KEYWORD)]
return inspect.Signature(params)