def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
dispatcher = singledispatch(method)
provides = set()
def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
call = dispatcher.dispatch(type)
try:
return call(self, query, context=context)
except TypeError:
raise DataSource.unsupported(type)
def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
provides.add(type)
return dispatcher.register(type)
wrapper.register = register
wrapper._provides = provides
update_wrapper(wrapper, method)
return wrapper
评论列表
文章目录