def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
dispatcher = singledispatch(method)
accepts = set()
def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None:
call = dispatcher.dispatch(type)
try:
return call(self, items, context=context)
except TypeError:
raise DataSink.unsupported(type)
def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
accepts.add(type)
return dispatcher.register(type)
wrapper.register = register
wrapper._accepts = accepts
update_wrapper(wrapper, method)
return wrapper
评论列表
文章目录