python类Callable()的实例源码

whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def apply(func,         # type: Callable[..., bytes]
              args=(),      # type: Sequence[AnyStr]
              exe=None,     # type: Optional[str]
              depfiles=(),  # type: Sequence[str]
              cache=None    # type: Optional[Cache]
              ):
        """Applies func(*args) when the result is not present in the cache.
        The result of func(*args) must be bytes and must not be None which is used as
        cache-miss indicator. After evaluation of func the result is stored in the cache.
        """
        key, value = None, None
        if cache is not None:
            hashobj = cache.mixtohash(args, exe=exe, depfiles=depfiles)
            key = hashobj.hexdigest()
            value = cache.get(key)
        if value is None:
            value = func(*args)
            if key is not None:
                cache.set(key, value)
        return value
__init__.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def register_command(handler: Callable[[argparse.Namespace], None],
                     main_parser: Optional[ArgParserType]=None,
                    ) -> Callable[[argparse.Namespace], None]:
    if main_parser is None:
        main_parser = global_argparser
    if id(main_parser) not in _subparsers:
        subparsers = main_parser.add_subparsers(title='commands',
                                                dest='command')
        _subparsers[id(main_parser)] = subparsers
    else:
        subparsers = _subparsers[id(main_parser)]

    @functools.wraps(handler)
    def wrapped(args):
        handler(args)

    doc_summary = handler.__doc__.split('\n\n')[0]
    inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
                                         description=handler.__doc__,
                                         help=doc_summary)
    inner_parser.set_defaults(function=wrapped)
    wrapped.register_command = functools.partial(register_command,
                                                 main_parser=inner_parser)
    wrapped.add_argument = inner_parser.add_argument
    return wrapped
labeled_example.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self,
                 get_raw_audio: Callable[[], ndarray],
                 sample_rate: int = 16000,
                 id: Optional[str] = None,
                 label: Optional[str] = "nolabel",
                 fourier_window_length: int = 512,
                 hop_length: int = 128,
                 mel_frequency_count: int = 128,
                 label_with_tags: str = None,
                 positional_label: Optional[PositionalLabel] = None):
        super().__init__(id=id, label=label)

        # The default values for hop_length and fourier_window_length are powers of 2 near the values specified in the wave2letter paper.
        self.get_raw_audio = get_raw_audio
        self.sample_rate = sample_rate
        self.fourier_window_length = fourier_window_length
        self.hop_length = hop_length
        self.mel_frequency_count = mel_frequency_count
        self.label_with_tags = label_with_tags
        self.positional_label = positional_label
corpus.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
        [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
        def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
            examples_by_directory = group(examples, key=key_from_example)
            directories = examples_by_directory.keys()

            # split must be the same every time:
            random.seed(42)
            keys = set(random.sample(directories, int(training_share * len(directories))))

            training_examples = [example for example in examples if key_from_example(example) in keys]
            test_examples = [example for example in examples if key_from_example(example) not in keys]

            return training_examples, test_examples

        return split
sources.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
        dispatcher = singledispatch(method)
        provides = set()

        def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
            call = dispatcher.dispatch(type)
            try:
                return call(self, query, context=context)
            except TypeError:
                raise DataSource.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
            provides.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._provides = provides
        update_wrapper(wrapper, method)
        return wrapper
sinks.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
        dispatcher = singledispatch(method)
        accepts = set()

        def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None:
            call = dispatcher.dispatch(type)
            try:
                return call(self, items, context=context)
            except TypeError:
                raise DataSink.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
            accepts.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._accepts = accepts
        update_wrapper(wrapper, method)
        return wrapper
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]:
        try:
            LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
            path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost")
            LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        except (KeyError, NetworkXNoPath):
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type))

        LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        chain = []
        cost = 0
        for source, target in _pairwise(path):
            transformer = self._type_graph.adj[source][target][_TRANSFORMER]
            chain.append((transformer, target))
            cost += transformer.cost
        LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))

        if not chain:
            return _identity, 0

        return partial(_transform, transformer_chain=chain), cost
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _best_transform_to(self, target_type: Type[T], source_types: Iterable[Type]) -> Tuple[Callable[[T], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        from_type = None
        for source_type in source_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    from_type = source_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert from any of \"{source_types}\" to \"{target_type}\"".format(source_types=source_types, target_type=target_type))
        return best, from_type, best_cost
queries.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator":
        if self._current is None or self._current.child is not None:
            raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".")

        if self._current.required:
            raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".")

        if supplies_type:
            expected_type = supplies_type
        else:
            expected_type = type(value)

        default_node = _DefaultValueNode(self._current.key, value, supplies_type)
        result = self.as_(expected_type)
        result._current.child.child = default_node
        return result
transformers.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
        dispatcher = singledispatch(method)
        transforms = {}

        def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T:
            call = dispatcher.dispatch(TypePair[value.__class__, target_type])
            try:
                return call(self, value, context=context)
            except TypeError:
                raise DataTransformer.unsupported(target_type, value)

        def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
            try:
                target_types = transforms[from_type]
            except KeyError:
                target_types = set()
                transforms[from_type] = target_types
            target_types.add(to_type)

            return dispatcher.register(TypePair[from_type, to_type])

        wrapper.register = register
        wrapper._transforms = transforms
        update_wrapper(wrapper, method)
        return wrapper
test_payout.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_suggestPayout():
    MockedReportWrapper = NamedTuple('MockedReportWrapper', [('getReportBody', Callable),
                                                             ('getReportWeakness', Callable),
                                                             ('getVulnDomains', Callable)])
    MockedReportWrapperXSS = MockedReportWrapper(getReportBody=lambda: 'XSS',
                                                 getReportWeakness=lambda: 'XSS',
                                                 getVulnDomains=lambda: [])
    assert payout.suggestPayout(MockedReportWrapperXSS) == config.payoutDB['xss']['average']
    for vulnType in config.payoutDB:
        for domain in config.payoutDB[vulnType]:
            MockedReportWrapperVuln = MockedReportWrapper(getReportBody=lambda: vulnType,
                                                          getReportWeakness=lambda: vulnType,
                                                          getVulnDomains=lambda: [domain])
            assert payout.suggestPayout(MockedReportWrapperVuln) == config.payoutDB[vulnType][domain]
    MockedReportWrapperNone = MockedReportWrapper(getReportBody=lambda: '',
                                                  getReportWeakness=lambda: '',
                                                  getVulnDomains=lambda: [])
    assert payout.suggestPayout(MockedReportWrapperNone) is None
SeleniumDrivers.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def retry(func: Callable[[], T]) -> T:
    """ Retry the function with 30 second timeouts until it works
        - I've observed the getFirefoxDriver() without this freeze once (out of hundreds of runs...) so adding this
          as a safety measure. """
    for i in range(10):
        if config.DEBUG and i > 0:
            print("Retry #%s" % str(i))

        def timeoutHandler(signum, frame):
            raise TimeoutException("Timeout!")
        signal.signal(signal.SIGALRM, timeoutHandler)
        signal.alarm(delayTime)
        try:
            t = func()
            signal.alarm(0)
            return t
        except TimeoutException:
            pass
    signal.alarm(0)
    raise TimeoutException("Retried 10 times... Failed!")
dispatch.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self,
                 connection: Connection,
                 consumer: BrightsideConsumerConfiguration,
                 consumer_factory: Callable[[Connection, BrightsideConsumerConfiguration, logging.Logger], BrightsideConsumer],
                 command_processor_factory: Callable[[str], CommandProcessor],
                 mapper_func: Callable[[BrightsideMessage], Request]) -> None:
        """
        The configuration parameters for one consumer - can create one or more performers from this, each of which is
        a message pump reading froma queue
        :param connection: The connection to the broker
        :param consumer: The consumer we want to create (routing key, queue etc)
        :param consumer_factory: A factory to create a consumer to read from a broker, a given implementation i.e. arame
        the command processor factory creates a command procesoor configured for a pipeline
        :param mapper_func: Maps between messages on the queue and requests (commnands/events)
        """
        self._connection = connection
        self._consumer = consumer
        self._consumer_factory = consumer_factory
        self._command_processor_factory = command_processor_factory
        self._mapper_func = mapper_func
registry.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def register(self, request_class: Request, handler_factory: Callable[[], Handler]) -> None:
        """
        Register the handler for the command
        :param request_class: The command or event to dispatch. It must implement getKey()
        :param handler_factory: A factory method to create the handler to dispatch to
        :return:
        """
        key = request_class.__name__
        is_command = request_class.is_command()
        is_event = request_class.is_event()
        is_present = key in self._registry
        if is_command and is_present:
            raise ConfigurationException("A handler for this request has already been registered")
        elif is_event and is_present:
            self._registry[key].append(handler_factory)
        elif is_command or is_event:
            self._registry[key] = [handler_factory]
initializers.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _initializer_wrapper(init_function: Callable[..., None]) -> Type[Initializer]:
    class Init(Initializer):
        def __init__(self, **kwargs):
            self._init_function = init_function
            self._kwargs = kwargs
        def __call__(self, tensor: torch.autograd.Variable) -> None:
            self._init_function(tensor, **self._kwargs)
        def __repr__(self):
            return 'Init: %s, with params: %s' % (self._init_function, self._kwargs)
        @classmethod
        def from_params(cls, params: Params):
            return cls(**params.as_dict())
    return Init


# There are no classes to decorate, so we hack these into Registrable._registry
util.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _last_dimension_applicator(function_to_apply: Callable[[torch.Tensor, Optional[torch.Tensor]], torch.Tensor],
                               tensor: torch.Tensor,
                               mask: Optional[torch.Tensor] = None):
    """
    Takes a tensor with 3 or more dimensions and applies a function over the last dimension.  We
    assume the tensor has shape ``(batch_size, ..., sequence_length)`` and that the mask (if given)
    has shape ``(batch_size, sequence_length)``.  We first unsqueeze and expand the mask so that it
    has the same shape as the tensor, then flatten them both to be 2D, pass them through
    the function and put the tensor back in its original shape.
    """
    tensor_shape = tensor.size()
    reshaped_tensor = tensor.view(-1, tensor.size()[-1])
    if mask is not None:
        while mask.dim() < tensor.dim():
            mask = mask.unsqueeze(1)
        mask = mask.expand_as(tensor).contiguous().float()
        mask = mask.view(-1, mask.size()[-1])
    reshaped_result = function_to_apply(reshaped_tensor, mask)
    return reshaped_result.view(*tensor_shape)
util.py 文件源码 项目:parsita 作者: drhagen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def splat(f: Callable[..., A]) -> Callable[[Iterable], A]:
    """Convert a function taking multiple arguments into a function taking a single iterable argument.

    Args:
        f: Any function

    Returns:
        A function that accepts a single iterable argument. Each element of this iterable argument is passed as an
        argument to ``f``.

    Example:
        $ def f(a, b, c):
        $     return a + b + c
        $
        $ f(1, 2, 3)  # 6
        $ g = splat(f)
        $ g([1, 2, 3])  # 6
    """

    def splatted(args):
        return f(*args)

    return splatted
util.py 文件源码 项目:parsita 作者: drhagen 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unsplat(f: Callable[[Iterable], A]) -> Callable[..., A]:
    """Convert a function taking a single iterable argument into a function taking multiple arguments.

    Args:
        f: Any function taking a single iterable argument

    Returns:
        A function that accepts multiple arguments. Each argument of this function is passed as an element of an
        iterable to ``f``.

    Example:
        $ def f(a):
        $     return a[0] + a[1] + a[2]
        $
        $ f([1, 2, 3])  # 6
        $ g = unsplat(f)
        $ g(1, 2, 3)  # 6
    """

    def unsplatted(*args):
        return f(args)

    return unsplatted
tree.py 文件源码 项目:extra-trees 作者: allrod5 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _split_sample(
            split: Callable[[object], bool], X: np.ndarray, y: np.ndarray
    ) -> Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]:
        """
        Split X, y sample set in two with a split function
        :return: ((X_left, y_left), (X_right, y_right))
        """
        if split.type is 'numerical':
            left_indexes = X[:, split.attribute] < split.criteria
            right_indexes = ~left_indexes
        else:
            Z = (
                pd.Index(pd.unique(split.criteria))
                .get_indexer(X[:, split.attribute]))
            left_indexes = np.where(Z >= 0)[0]
            right_indexes = np.where(Z < 0)[0]

        left = X[left_indexes], y[left_indexes]
        right = X[right_indexes], y[right_indexes]

        return left, right
__init__.py 文件源码 项目:cc98 作者: zjuchenyuan 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _producer_multi_threads(queue_task, queue_product, worker_function):
    """
    ??????????????
    :type queue_task: multiprocessing.JoinableQueue
    :type queue_product: multiprocessing.JoinableQueue
    :type worker_function: Callable[[Any], Any]
    """
    while True:
        try:
            task = queue_task.get()
            if isinstance(task, _QueueEndSignal):  # ????
                # finally ?? task_done() ?break??????????
                break
            if isinstance(task, dict):
                result = worker_function(**task)
            elif isinstance(task, (tuple, list)):
                result = worker_function(*task)
            else:
                result = worker_function(task)

            queue_product.put((task, result))
        except:
            traceback.print_exc()
        finally:
            queue_task.task_done()
socks5_client.py 文件源码 项目:pyShadowsocks 作者: FTwOoO 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, loop,
                 target_host, target_port,
                 connected_callback: Callable[[BaseProtocol], None],
                 data_callback: Callable[[bytes], None],
                 user=None,
                 password=None):

        super(SOCKS5ConnectProtocol, self).__init__()
        self.loop = loop

        self.data_buffer = b''
        self.connected_callback = connected_callback
        self.data_callback = data_callback
        self.target_host = target_host
        self.target_port = target_port

        self.user = user
        self.password = password

        if (self.user and self.password):
            self.auth_method = constants.SOCKS5_METHOD_USERNAME_PASSWORD
        else:
            self.auth_method = constants.SOCKS5_METHOD_NO_AUTHENTICATION_REQUIRED

        self.state = STAGE_SOCKS5_METHOD_SELECT
socks5_processor.py 文件源码 项目:pyShadowsocks 作者: FTwOoO 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, loop, transport,
                 tcp_connect_coroutine: Callable[[Socks5AddrHeader], Tuple[str, int]],
                 udp_connect_coroutine: Callable[[Socks5AddrHeader], Tuple[str, int]],
                 auth=constants.SOCKS5_METHOD_NO_AUTHENTICATION_REQUIRED,
                 username_passwords: Dict = None):

        self.loop = loop
        self.transport = transport

        self.tcp_connect_coroutine = tcp_connect_coroutine
        self.udp_connect_coroutine = udp_connect_coroutine

        self.auth = auth
        self.username_passwords = username_passwords

        self.state = constants.STAGE_SOCKS5_METHOD_SELECT
element.py 文件源码 项目:wdom 作者: miyakogi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getElementsBy(start_node: ParentNode,
                  cond: Callable[['Element'], bool]) -> NodeList:
    """Return list of child elements of start_node which matches ``cond``.

    ``cond`` must be a function which gets a single argument ``Element``,
    and returns boolean. If the node matches requested condition, ``cond``
    should return True.
    This searches all child elements recursively.

    :arg ParentNode start_node:
    :arg cond: Callable[[Element], bool]
    :rtype: NodeList[Element]
    """
    elements = []
    for child in start_node.children:
        if cond(child):
            elements.append(child)
        elements.extend(child.getElementsBy(cond))
    return NodeList(elements)
django.py 文件源码 项目:bucket_throttling 作者: atten 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def throttle_request(throttling_rules: [RuleList, ThrottlingRule],
                     throttling_arguments_func: Callable=None,
                     throttling_options: ThrottlingOptions=None) -> Callable:
    """
    ????????? ??? view-???????, ?????????? ?????????.
    :param throttling_rules: ???? ????????? ThrottlingRule ??? ??????
    :param throttling_arguments_func: ???????, ?? ??????? ??????????? ????? ?????????? ??? ???????.
                                      ???? ?? ??????, ??????? ?-?? ?? ?????????.
    :param throttling_options: ????????? ThrottlingOptions, ???? ????? ????????? ?????????.
    """
    def decorator(func):
        func.throttling_rules = throttling_rules
        func.throttling_arguments_func = throttling_arguments_func
        func.throttling_options = throttling_options
        return func
    return decorator
ghost.py 文件源码 项目:merakicommons 作者: meraki-analytics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def property(load_group_or_method: Union[Callable[[Any], Any], Any]) -> Union[property, Callable[[Callable[[Any], Any]], property]]:
        # Default behavior when no load group is provided. Set the load group to the method name.
        if isinstance(load_group_or_method, Callable) and not isinstance(load_group_or_method, type):
            method = load_group_or_method
            load_group = method.__name__
            method._Ghost__load_group = load_group

        # Set the load group as provided in the decorator
        else:
            load_group = load_group_or_method

        # The load_group and method variables need to be set correctly before this function defintion.
        def decorator(method: Callable) -> property:
            method._Ghost__load_group = load_group
            prop = Ghost.__property(method)
            prop._Ghost__load_group = load_group
            return prop

        if isinstance(load_group_or_method, Callable) and not isinstance(load_group_or_method, type):
            return decorator(method)
        else:
            return decorator
props.py 文件源码 项目:protoactor-python 作者: AsynkronIT 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, producer: Callable[[], 'Actor'] = None,
                 spawner: Callable[[str, 'Props', pid.PID], pid.PID] = default_spawner,
                 mailbox_producer: Callable[
                     [invoker.AbstractInvoker,
                      'AbstractDispatcher'], mailbox.AbstractMailbox] = default_mailbox_producer,
                 dispatcher: 'AbstractDispatcher' = dispatcher.ThreadDispatcher(),
                 supervisor_strategy: supervision.AbstractSupervisorStrategy = None,
                 middleware: List[Callable[[context.AbstractContext], Task]] = None,
                 middleware_chain: Callable[[context.AbstractContext], Task] = None) -> None:
        self.__producer = producer
        self.__spawner = spawner
        self.__mailbox_producer = mailbox_producer
        self.__supervisor_strategy = supervisor_strategy
        self.__dispatcher = dispatcher
        self.__middleware = middleware
        self.__middleware_chain = middleware_chain
context.py 文件源码 项目:sketchbook 作者: futursolo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(
        self, *, cache_sketches: bool=True,
        source_encoding: str="utf-8",
            custom_escape_fns: Mapping[str, Callable[[Any], str]]={}) -> None:

        self._source_encoding = source_encoding

        escape_fns = escaping.builtin_escape_fns.copy()
        if custom_escape_fns:
            escape_fns.update(custom_escape_fns)
        self._escape_fns = types.MappingProxyType(escape_fns)

        self._stmt_classes = list(statements.builtin_stmt_classes)

        class OutputStmt(statements.BaseOutput):
            _filter_fn_names = list(self.escape_fns.keys())

        self._stmt_classes.append(OutputStmt)

        self._cache_sketches = cache_sketches
runtime.py 文件源码 项目:sketchbook 作者: futursolo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __getitem__(
        self, name: Union[str, Tuple[str, bool]]) -> Callable[
            ..., Awaitable[str]]:
        if isinstance(name, tuple):
            block_name, defined_here = name

        else:
            block_name = name
            defined_here = False

        if block_name not in self._blocks.keys():
            raise KeyError(f"Unknown Block Name {block_name}.")

        SelectedBlockRuntime = self._blocks[block_name]

        async def wrapper() -> str:
            block_rt = SelectedBlockRuntime(
                self._skt_rt, _defined_here=defined_here)

            await block_rt._draw()

            return block_rt._block_result

        return wrapper
postdownload.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def write_file_update_progress(self, cmd: str, callback: Callable, filesize: int=0, blocksize: int=8192, rest: bool=None) -> str:
        self.ftps.voidcmd('TYPE I')  # type: ignore
        with self.ftps.transfercmd(cmd, rest) as conn:  # type: ignore
            self.current_download['filesize'] = filesize
            self.current_download['downloaded'] = 0
            self.current_download['filename'] = cmd.replace('RETR ', '')
            start = time.clock()
            while 1:
                data = conn.recv(blocksize)
                if not data:
                    break
                downloaded = len(data)
                self.current_download['downloaded'] += downloaded
                current = time.clock()
                if current > start:
                    self.current_download['speed'] = self.current_download['downloaded'] / ((current - start) * 1024)
                callback(data)
            self.current_download['filename'] = ''
            self.current_download['speed'] = 0
            self.current_download['filesize'] = 0
            # shutdown ssl layer
            if _SSLSocket is not None and isinstance(conn, _SSLSocket):
                conn.unwrap()
        return self.ftps.voidresp()  # type: ignore
providers.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _get_provider_submodule_method(module_name: str, submodule_name: str, method_name: str) -> Optional[Callable]:
    sub_module = "{}.{}".format(module_name, submodule_name)
    try:
        importlib.import_module(module_name, package='__path__')
    except ImportError:
        return None
    if importlib.util.find_spec(sub_module):
        site = importlib.import_module(sub_module, package=module_name)
        if hasattr(site, method_name):
            obj = getattr(site, method_name)
            if inspect.isfunction(obj):
                return obj
    return None


# We should only create one ProviderContext over the program lifetime,
# to avoid having to search the file system every time it's created.
# This is why this should be outside Settings


问题


面经


文章

微信
公众号

扫码关注公众号