python类cast()的实例源码

bot.py 文件源码 项目:Hanabi-AI 作者: MeGotsThis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def discardSomeCard(self) -> bool:
        best_index: int = 0
        card: CardKnowledge
        bestCard: CardKnowledge
        i: int
        for i in range(len(self.hand)):
            card = cast(CardKnowledge, self.game.deck[self.hand[i]])
            bestCard = cast(CardKnowledge,
                            self.game.deck[self.hand[best_index]])
            if bestCard.maybeValue is None:
                best_index = i
            elif (card.maybeValue is not None
                    and card.maybeValue > bestCard.maybeValue):
                best_index = i
        self.discard_card(best_index)
        return True
require.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any:
    """Make sure a value is a boolean.

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != bool:
        if not convert:
            raise InvalidData()
        if value in [None, 0, '0', 'false', 'False']:
            value = False
        elif value in [1, '1', 'true', 'True']:
            value = True
        else:
            raise InvalidData('value was %s(%s), expected bool' % (type(value), value))
    return cast(bool, value)
require.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None,
                 allow_none: bool=False) -> Any:
    """Make sure a value is a Dict[key_type, value_type].

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != dict:
        raise InvalidData('value was %s(%s), expected dict' % (type(value), value))
    value = cast(Dict, value)
    if key_type or value_type:
        for k, v in value.items():
            if key_type and type(k) != key_type:
                raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type))
            if value_type and type(v) != value_type:
                raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type))
    return value
view.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]:
        include_metadata = require_bool(
            get_request_param(self.request, 'include_metadata', error_if_missing=False),
            convert=True) or False
        if not include_metadata:
            return None
        if 'id' in self.request.rel_url.query:
            metadata_models = await metadata.get_metadata_for_object(
                dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id'))))
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            metadata_models = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors')
        elif 'monitor_group_id' in self.request.rel_url.query:
            metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group(
                dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id'))))
        else:
            metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor')
        metadata_dict = {}  # type: Dict[int, Dict[str, str]]
        for metadata_model in metadata_models:
            if metadata_model.object_id not in metadata_dict:
                metadata_dict[metadata_model.object_id] = {}
            metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value
        return metadata_dict
bucket_iterator.py 文件源码 项目:allennlp 作者: allenai 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _sort_dataset_by_padding(dataset: Dataset,
                                 sorting_keys: List[Tuple[str, str]],  # pylint: disable=invalid-sequence-index
                                 padding_noise: float = 0.0) -> Dataset:
        """
        Sorts the ``Instances`` in this ``Dataset`` by their padding lengths, using the keys in
        ``sorting_keys`` (in the order in which they are provided).  ``sorting_keys`` is a list of
        ``(field_name, padding_key)`` tuples.
        """
        instances_with_lengths = []
        for instance in dataset.instances:
            padding_lengths = cast(Dict[str, Dict[str, float]], instance.get_padding_lengths())
            if padding_noise > 0.0:
                noisy_lengths = {}
                for field_name, field_lengths in padding_lengths.items():
                    noisy_lengths[field_name] = add_noise_to_dict_values(field_lengths, padding_noise)
                padding_lengths = noisy_lengths
            instance_with_lengths = ([padding_lengths[field_name][padding_key]
                                      for (field_name, padding_key) in sorting_keys],
                                     instance)
            instances_with_lengths.append(instance_with_lengths)
        instances_with_lengths.sort(key=lambda x: x[0])
        return Dataset([instance_with_lengths[-1] for instance_with_lengths in instances_with_lengths])
models.py 文件源码 项目:suq 作者: MaxwellBo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]:
    """
    Finds this weeks remaining common breaks between a group of users
    """
    # So, the Mypy type checker treats `List` as invariant, meaning we
    # can't give a `List[B]` to a function that expects a `List[A]` if
    # B is a subclass of A.
    # So we have to cast it in to the function...

    # FIXME: Get rid of these casts when Van Rossum figures out how to write a
    #        proper type system
    breaks = cast(List[Event_], get_shared_breaks(group_members))
    now = datetime.now(BRISBANE_TIME_ZONE)

    ### ... and out.
    return cast(List[Break], get_this_weeks_events(now, breaks))


# FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
__init__.py 文件源码 项目:gopythongo 作者: gopythongo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_file_in_config_folder(self, filename: str, mode: int=None) -> TextIO:
        """
        :param filename: the name of the file in the generated config folder
        :param mode: pass an ``int`` here if you want to modify the files mode (will be umasked)
        :return: an open file descriptor (``TextIO``) object that the *caller must call `.close()` on*
        """
        if os.path.isfile(filename):
            raise InvalidArgumentException("Call create_file_in_config_folder with a filename, not a path")

        self.ensure_config_folder()
        f = cast(TextIO, io.open(os.path.join(self.configfolder, filename), mode="wt", encoding="utf-8"))

        if mode:
            os.chmod(os.path.join(self.configfolder, filename), get_umasked_mode(mode))

        return f
aptly.py 文件源码 项目:gopythongo 作者: gopythongo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate_args(self, args: configargparse.Namespace) -> None:
        _aptly_args.validate_shared_args(args)

        from gopythongo.versioners import get_version_parsers
        debvp = cast(DebianVersionParser, get_version_parsers()["debian"])  # type: DebianVersionParser
        if args.version_action not in debvp.supported_actions:
            raise ErrorMessage("Version Action is set to '%s', but you chose the Aptly Store which relies on Debian "
                               "version strings. Unfortunately the Debian Versioner does not support the '%s' action. "
                               "It only supports: %s." %
                               (highlight(args.version_action), highlight(args.version_action),
                                highlight(", ".join(debvp.supported_actions))))

        if "-distribution" in args.aptly_publish_opts:
            print_warning("You are using %s in your Aptly Store options. You should use the %s GoPythonGo argument "
                          "instead, since using -distribution in the aptly command line is invalid when GoPythonGo "
                          "tries to update a published repo." %
                          (highlight("-distribution"), highlight("--aptly-distribution")))

        if args.use_aptly_wrapper:
            wrapper_cmd = create_script_path(the_context.gopythongo_path, "vaultwrapper")
            if not os.path.exists(wrapper_cmd) or not os.access(wrapper_cmd, os.X_OK):
                raise ErrorMessage("%s can either not be found or is not executable. The vault wrapper seems to "
                                   "be unavailable." % wrapper_cmd)
            self.aptly_wrapper_cmd = wrapper_cmd
autodetect.py 文件源码 项目:rcli 作者: contains-io 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_commands(dist  # type: setuptools.dist.Distribution
                  ):
    # type: (...) -> typing.Dict[str, typing.Set[str]]
    """Find all commands belonging to the given distribution.

    Args:
        dist: The Distribution to search for docopt-compatible docstrings that
            can be used to generate command entry points.

    Returns:
        A dictionary containing a mapping of primary commands to sets of
        subcommands.
    """
    py_files = (f for f in setuptools.findall()
                if os.path.splitext(f)[1].lower() == '.py')
    pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages)
    commands = {}  # type: typing.Dict[str, typing.Set[str]]
    for file_name in pkg_files:
        with open(file_name) as py_file:
            module = typing.cast(ast.Module, ast.parse(py_file.read()))
        module_name = _get_module_name(file_name)
        _append_commands(commands, module_name, _get_module_commands(module))
        _append_commands(commands, module_name, _get_class_commands(module))
        _append_commands(commands, module_name, _get_function_commands(module))
    return commands
helpers.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]:
    """Get all objects of the specified model filtered by the specified
    criteria.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested objects.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
helpers.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def filter_single_or_404(model: t.Type[Y], *criteria: t.Any) -> Y:
    """Get a single object of the specified model by filtering or raise an
    exception.

    .. note::
        ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a
        SQLAlchemy model.

    :param model: The object to get.
    :param criteria: The criteria to filter with.
    :returns: The requested object.

    :raises APIException: If no object with the given id could be found.
        (OBJECT_ID_NOT_FOUND)
    """
    return t.cast(Y, _filter_or_404(model, False, criteria))
graphql_impl.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def gen_update_pent_dynamic(
    context: PentContext,
    obj_id: UUID,
    pent_cls_name: str,
    data_cls_name: str,
    payload_cls_name: str,
    data: PentMutationData
) -> PentMutationPayload:

    data_cls = context.cls_from_name(data_cls_name)
    check.isinst(data, data_cls)

    pent_cls = context.cls_from_name(pent_cls_name)
    payload_cls = context.cls_from_name(payload_cls_name)

    pent = await update_pent(context, pent_cls, obj_id, data)
    return cast(PentMutationPayload, payload_cls(pent))
graphql_client.py 文件源码 项目:graphscale 作者: schrockn 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def gen_operation(self, graphql_text: str, operation: str, *args: GraphQLArg) -> dict:
        arg_strings = []
        for name, arg_type, _value in args:
            arg_strings.append("${name}: {arg_type}".format(name=name, arg_type=arg_type))

        arg_list = ', '.join(arg_strings)

        full_query = (
            '{operation} ({arg_list}) '.format(arg_list=arg_list, operation=operation) + '{' +
            graphql_text + '}'
        )
        arg_dict = {arg.name: arg.value for arg in args}
        result = await (
            exec_in_mem_graphql(
                self.graphql_schema, self.context, full_query, self.root_value, arg_dict
            )
        )
        if result.errors:
            _process_error(result)

        return cast(dict, result.data)
unicode_exporter.py 文件源码 项目:mazes-for-programmers-python-src 作者: Kartones 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def render(self, grid: Grid, **kwargs: Any) -> None:
        horizontal_wall = "\u2501"
        vertical_wall = "\u2503"

        output = self.JUNCTIONS[12]
        for x in range(grid.columns - 1):
            output += (horizontal_wall * 3 + self.get_topmost_junction(cast(Cell, grid.cell_at(row=0, column=x))))
        output += horizontal_wall * 3 + self.JUNCTIONS[10] + "\n"

        for row in grid.each_row():
            top = vertical_wall
            bottom = self.get_leftmost_junction(row[0])
            for cell in row:
                body = grid.contents_of(cell)
                east_boundary = " " if cell.linked_to(cell.east) else vertical_wall
                top += body + east_boundary
                south_boundary = "   " if cell.linked_to(cell.south) else horizontal_wall * 3
                bottom += south_boundary + self.get_south_east_junction(cell)
            output += top + "\n"
            output += bottom + "\n"

        print(output)
sequence_labeler.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        fd = {}  # type: FeedDict

        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        fd[self.train_mode] = train

        if sentences is not None:
            vectors, paddings = self.vocabulary.sentences_to_tensor(
                list(sentences), pad_to_max_len=False, train_mode=train)

            fd[self.train_targets] = vectors.T
            fd[self.train_weights] = paddings.T

        return fd
classifier.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        sentences_list = list(sentences) if sentences is not None else None

        fd = {}  # type: FeedDict

        if sentences is not None:
            label_tensors, _ = self.vocabulary.sentences_to_tensor(
                sentences_list, self.max_output_len)

            # pylint: disable=unsubscriptable-object
            fd[self.gt_inputs[0]] = label_tensors[0]
            # pylint: enable=unsubscriptable-object

        fd[self.train_mode] = train

        return fd
dataset.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
    """Get paths to series outputs from the dataset keyword argument specs.

    Output file for a series named 'xxx' is specified by parameter 's_xxx_out'

    Arguments:
        series_config: A dictionary containing the dataset keyword argument
           specs.

    Returns:
        A dictionary which maps serie names to the paths for their output
        files.
    """
    outputs = {}
    for key, value in series_config.items():
        matcher = SERIES_OUTPUT.match(key)
        if matcher:
            name = matcher.group(1)
            if not isinstance(value, str):
                raise ValueError(
                    "Output path for '{}' series must be a string, was {}.".
                    format(name, type(value)))
            outputs[name] = cast(str, value)
    return outputs
dataset.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _preprocessed_datasets(
        dataset: Dataset,
        series_config: SeriesConfig) -> None:
    """Apply dataset-level preprocessing."""
    keys = [key for key in series_config.keys()
            if PREPROCESSED_SERIES.match(key)]

    for key in keys:
        name = PREPROCESSED_SERIES.match(key).group(1)
        preprocessor = cast(DatasetPreprocess, series_config[key])

        if isinstance(dataset, Dataset):
            new_series = list(preprocessor(dataset))
            dataset.add_series(name, new_series)
        elif isinstance(dataset, LazyDataset):
            dataset.preprocess_series[name] = (None, preprocessor)
representation_runner.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self,
                 output_series: str,
                 encoder: Stateful,
                 used_session: int = 0) -> None:
        """Initialize the representation runner.

        Args:
            output_series: Name of the output seriesi with vectors.
            encoder: Used encoder.
            used_session: Id of the TensorFlow session used in case of model
                ensembles.
        """
        check_argument_types()

        if not isinstance(encoder, ModelPart):
            raise TypeError("The encoder of the representation runner has to "
                            "be an instance of 'ModelPart'")

        BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))

        self._used_session = used_session  # type: int
        self._encoded = encoder.output  # type: tf.Tensor

    # pylint: disable=unused-argument
sequence_labeler.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        fd = {}  # type: FeedDict

        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        fd[self.train_mode] = train

        if sentences is not None:
            vectors, paddings = self.vocabulary.sentences_to_tensor(
                list(sentences), pad_to_max_len=False, train_mode=train)

            fd[self.train_targets] = vectors.T
            fd[self.train_weights] = paddings.T

        return fd
dataset.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
    """Get paths to series outputs from the dataset keyword argument specs.

    Output file for a series named 'xxx' is specified by parameter 's_xxx_out'

    Arguments:
        series_config: A dictionary containing the dataset keyword argument
           specs.

    Returns:
        A dictionary which maps serie names to the paths for their output
        files.
    """
    outputs = {}
    for key, value in series_config.items():
        matcher = SERIES_OUTPUT.match(key)
        if matcher:
            name = matcher.group(1)
            if not isinstance(value, str):
                raise ValueError(
                    "Output path for '{}' series must be a string, was {}.".
                    format(name, type(value)))
            outputs[name] = cast(str, value)
    return outputs
dataset.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _preprocessed_datasets(
        dataset: Dataset,
        series_config: SeriesConfig) -> None:
    """Apply dataset-level preprocessing."""
    keys = [key for key in series_config.keys()
            if PREPROCESSED_SERIES.match(key)]

    for key in keys:
        name = PREPROCESSED_SERIES.match(key).group(1)
        preprocessor = cast(DatasetPreprocess, series_config[key])

        if isinstance(dataset, Dataset):
            new_series = list(preprocessor(dataset))
            dataset.add_series(name, new_series)
        elif isinstance(dataset, LazyDataset):
            dataset.preprocess_series[name] = (None, preprocessor)
representation_runner.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self,
                 output_series: str,
                 encoder: Stateful,
                 used_session: int = 0) -> None:
        """Initialize the representation runner.

        Args:
            output_series: Name of the output seriesi with vectors.
            encoder: Used encoder.
            used_session: Id of the TensorFlow session used in case of model
                ensembles.
        """
        check_argument_types()

        if not isinstance(encoder, ModelPart):
            raise TypeError("The encoder of the representation runner has to "
                            "be an instance of 'ModelPart'")

        BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))

        self._used_session = used_session  # type: int
        self._encoded = encoder.output  # type: tf.Tensor

    # pylint: disable=unused-argument
classifier.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def feed_dict(self, dataset: Dataset, train: bool = False) -> FeedDict:
        sentences = cast(Iterable[List[str]],
                         dataset.get_series(self.data_id, allow_none=True))

        sentences_list = list(sentences) if sentences is not None else None

        fd = {}  # type: FeedDict

        if sentences is not None:
            label_tensors, _ = self.vocabulary.sentences_to_tensor(
                sentences_list, self.max_output_len)

            # pylint: disable=unsubscriptable-object
            fd[self.gt_inputs[0]] = label_tensors[0]
            # pylint: enable=unsubscriptable-object

        fd[self.train_mode] = train

        return fd
dataset.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_series_outputs(series_config: SeriesConfig) -> Dict[str, str]:
    """Get paths to series outputs from the dataset keyword argument specs.

    Output file for a series named 'xxx' is specified by parameter 's_xxx_out'

    Arguments:
        series_config: A dictionary containing the dataset keyword argument
           specs.

    Returns:
        A dictionary which maps serie names to the paths for their output
        files.
    """
    outputs = {}
    for key, value in series_config.items():
        matcher = SERIES_OUTPUT.match(key)
        if matcher:
            name = matcher.group(1)
            if not isinstance(value, str):
                raise ValueError(
                    "Output path for '{}' series must be a string, was {}.".
                    format(name, type(value)))
            outputs[name] = cast(str, value)
    return outputs
representation_runner.py 文件源码 项目:neuralmonkey 作者: ufal 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self,
                 output_series: str,
                 encoder: Stateful,
                 used_session: int = 0) -> None:
        """Initialize the representation runner.

        Args:
            output_series: Name of the output seriesi with vectors.
            encoder: Used encoder.
            used_session: Id of the TensorFlow session used in case of model
                ensembles.
        """
        check_argument_types()

        if not isinstance(encoder, ModelPart):
            raise TypeError("The encoder of the representation runner has to "
                            "be an instance of 'ModelPart'")

        BaseRunner.__init__(self, output_series, cast(ModelPart, encoder))

        self._used_session = used_session  # type: int
        self._encoded = encoder.output  # type: tf.Tensor

    # pylint: disable=unused-argument
peer_tcp_client.py 文件源码 项目:bit-torrent 作者: borzunov 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _handle_haves(self, message_id: MessageType, payload: memoryview):
        if message_id == MessageType.have:
            (index,) = struct.unpack('!I', cast(bytes, payload))
            self._mark_as_owner(index)
        elif message_id == MessageType.bitfield:
            piece_count = self._download_info.piece_count
            PeerTCPClient._check_payload_len(message_id, payload, int(ceil(piece_count / 8)))

            arr = bitarray(endian='big')
            arr.frombytes(payload.tobytes())
            for i in range(piece_count):
                if arr[i]:
                    self._mark_as_owner(i)
            for i in range(piece_count, len(arr)):
                if arr[i]:
                    raise ValueError('Spare bits in "bitfield" message must be zero')

        # if self._download_info.complete and self.is_seed():
        #     raise SeedError('A seed is disconnected because a download is complete')
peer_tcp_client.py 文件源码 项目:bit-torrent 作者: borzunov 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _handle_requests(self, message_id: MessageType, payload: memoryview):
        piece_index, begin, length = struct.unpack('!3I', cast(bytes, payload))
        request = BlockRequest(piece_index, begin, length)
        self._check_position_range(request)

        if message_id == MessageType.request:
            if length > PeerTCPClient.MAX_REQUEST_LENGTH:
                raise ValueError('Requested {} bytes, but the current policy allows to accept requests '
                                 'of not more than {} bytes'.format(length, PeerTCPClient.MAX_REQUEST_LENGTH))
            if (self._am_choking or not self._peer_interested or
                    not self._download_info.pieces[piece_index].downloaded):
                # If peer isn't interested but requesting, their peer_interested flag wasn't considered
                # when selecting who to unchoke, so we may be not ready to upload to them.
                # If requested piece is not downloaded yet, we shouldn't disconnect because our piece_downloaded flag
                # could be removed because of file corruption.
                return

            await self._send_block(request)
            await self.drain()
        elif message_id == MessageType.cancel:
            # Now we answer to a request immediately or reject and forget it,
            # so there's no need to handle cancel messages
            pass
collectors.py 文件源码 项目:aioprometheus 作者: claws 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add(self, labels: LabelsType, value: NumericValueType) -> None:
        ''' Add will add the given value to the counter.

        :raises: ValueError if the value is negative. Counters can only
          increase.
        '''
        value = cast(Union[float, int], value)  # typing check, no runtime behaviour.
        if value < 0:
            raise ValueError("Counters can't decrease")

        try:
            current = self.get_value(labels)
        except KeyError:
            current = 0

        current = cast(Union[float, int], current)  # typing check, no runtime behaviour.
        self.set_value(labels, current + value)
collectors.py 文件源码 项目:aioprometheus 作者: claws 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add(self, labels: LabelsType, value: NumericValueType) -> None:
        ''' Add adds a single observation to the summary '''

        value = cast(Union[float, int], value)  # typing check, no runtime behaviour.
        if type(value) not in (float, int):
            raise TypeError("Summary only works with digits (int, float)")

        try:
            e = self.get_value(labels)
        except KeyError:
            # Initialize quantile estimator
            e = quantile.Estimator(*self.invariants)
            self.set_value(labels, e)

        e.observe(float(value))  # type: ignore

    # https://prometheus.io/docs/instrumenting/writing_clientlibs/#summary
    # A summary MUST have the ``observe`` methods


问题


面经


文章

微信
公众号

扫码关注公众号