python类Any()的实例源码

frame_manager.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
                options: dict = None, **kwargs: Any) -> Awaitable:
        """Wait until `selectorOrFunctionOrTimeout`."""
        if options is None:
            options = dict()
        options.update(kwargs)
        if isinstance(selectorOrFunctionOrTimeout, (int, float)):
            fut: Awaitable[None] = asyncio.ensure_future(
                asyncio.sleep(selectorOrFunctionOrTimeout))
            return fut
        if not isinstance(selectorOrFunctionOrTimeout, str):
            fut = asyncio.get_event_loop().create_future()
            fut.set_exception(TypeError(
                'Unsupported target type: ' +
                str(type(selectorOrFunctionOrTimeout))
            ))
            return fut
        if ('=>' in selectorOrFunctionOrTimeout or
                selectorOrFunctionOrTimeout.strip().startswith('function')):
            return self.waitForFunction(selectorOrFunctionOrTimeout, options)
        return self.waitForSelector(selectorOrFunctionOrTimeout, options)
test_python_driver.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _send_receive(self, nummsgs: int, outformat: str='json',
                      dataupdate: Optional[Dict[AnyStr, Any]]=None,
                      restart_data: bool=True) -> List[Response]:
        if restart_data:
            self._restart_data(outformat)

        if dataupdate:
            self.data.update(dataupdate)

        self._add_to_buffer(nummsgs, outformat)
        self.sendbuffer.seek(0)

        processor, _ = get_processor_instance(
                outformat,
                custom_outbuffer=self.recvbuffer,
                custom_inbuffer=self.sendbuffer
        )
        processor.process_requests(self.sendbuffer)
        return self._loadResults(outformat)
cli.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None,
               custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]:
    """
    Get a processor instance. The class and buffers will be selected based on the
    python_driver.ProcessorConfigs dictionary. The input and output buffers can
    be overriden using the custom_inbuffer and custom_outbuffer parameters. This
    is mainly useful for unittesting.
    """
    conf = ProcessorConfigs.get(format_)
    if not conf:
        raise RequestInstantiationException('No RequestProcessor found for format %s' % format_)

    inbuffer  = custom_inbuffer if custom_inbuffer else conf['inbuffer']
    outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer']
    instance  = conf['class'](outbuffer) # type: ignore

    return instance, inbuffer
tracing.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start(self, options: dict = None, **kwargs: Any) -> None:
        """Start."""
        options = options or dict()
        options.update(kwargs)
        categoriesArray = [
            '-*', 'devtools.timeline', 'v8.execute',
            'disabled-by-default-devtools.timeline',
            'disabled-by-default-devtools.timeline.frame', 'toplevel',
            'blink.console', 'blink.user_timing', 'latencyInfo',
            'disabled-by-default-devtools.timeline.stack',
            'disabled-by-default-v8.cpu_profiler',
        ]

        if 'screenshots' in options:
            categoriesArray.append('disabled-by-default-devtools.screenshot')

        self._path = options.get('path', '')
        self._recording = True
        await self._client.send('Tracing.start', {
            'transferMode': 'ReturnAsStream',
            'categories': ','.join(categoriesArray),
        })
navigator_watcher.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
                 options: dict = None, **kwargs: Any) -> None:
        """Make new navigator watcher."""
        if options is None:
            options = {}
        options.update(kwargs)
        self._client = client
        self._ignoreHTTPSErrors = ignoreHTTPSErrors
        self._timeout = options.get('timeout', 3000)
        self._idleTime = options.get('networkIdleTimeout', 1000)
        self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
        self._idleInflight = options.get('networkIdleInflight', 2)
        self._waitUntil = options.get('waitUntil', 'load')
        if self._waitUntil not in ('load', 'networkidle'):
            raise ValueError(
                f'Unknown value for options.waitUntil: {self._waitUntil}')
page.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def waitForNavigation(self, options: dict = None, **kwargs: Any
                                ) -> Optional[Response]:
        """Wait navigation completes."""
        if options is None:
            options = dict()
        options.update(kwargs)
        watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
                                   options)
        responses: Dict[str, Response] = dict()
        listener = helper.addEventListener(
            self._networkManager,
            NetworkManager.Events.Response,
            lambda response: responses.__setitem__(response.url, response)
        )
        await watcher.waitForNavigation()
        helper.removeEventListeners([listener])
        return responses.get(self.url)
page.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def screenshot(self, options: dict = None, **kwargs: Any) -> bytes:
        """Take screen shot."""
        options = options or dict()
        options.update(kwargs)
        screenshotType = None
        if 'path' in options:
            mimeType, _ = mimetypes.guess_type(options['path'])
            if mimeType == 'image/png':
                screenshotType = 'png'
            elif mimeType == 'image/jpeg':
                screenshotType = 'jpeg'
            else:
                raise PageError('Unsupported screenshot '
                                f'mime type: {mimeType}')
        if 'type' in options:
            screenshotType = options['type']
        if not screenshotType:
            screenshotType = 'png'
        return await self._screenshotTask(screenshotType, options)
launcher.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None:
        """Make new launcher."""
        self.options = options or dict()
        self.options.update(kwargs)
        self.chrome_args = DEFAULT_ARGS
        self._tmp_user_data_dir: Optional[str] = None
        self._parse_args()
        if 'headless' not in self.options or self.options.get('headless'):
            self.chrome_args = self.chrome_args + [
                '--headless',
                '--disable-gpu',
                '--hide-scrollbars',
                '--mute-audio',
            ]
        if 'executablePath' in self.options:
            self.exec = self.options['executablePath']
        else:
            if not check_chromium():
                download_chromium()
            self.exec = str(chromium_excutable())
        self.cmd = [self.exec] + self.chrome_args
input.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def move(self, x: int, y: int, options: dict = None, **kwargs: Any
                   ) -> None:
        """Move cursor."""
        options = options or dict()
        options.update(kwargs)
        fromX = self._x
        fromY = self._y
        self._x = x
        self._y = y
        steps = options.get('steps', 1)
        for i in range(1, steps + 1):
            x = round(fromX + (self._x - fromX) * (i / steps))
            y = round(fromY + (self._y - fromY) * (i / steps))
            await self._client.send('Input.dispatchMouseEvent', {
                'type': 'mouseMoved',
                'button': self._button,
                'x': x,
                'y': y,
                'modifiers': self._keyboard._modifiers,
            })
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def contains_all_options(optiongroup, parentstyle, matchvalues=False):
    # type: (Dict[Any, Any], Style, bool) -> bool
    """Returns true if all options in optiongroup are present in parentstyle.
    If matchvalues is True, the values in optiongroup must also match
    those in the parentstyle.
    """
    for optionname, value in optiongroup.items():
        if optionname not in parentstyle:
            return False
        if isinstance(value, dict):
            parent_suboptions = parentstyle[optionname]
            if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues):
                return False
        elif matchvalues:
            pvalue = parentstyle.get(optionname)
            if type(pvalue) != type(value):
                return False
            if pvalue != value:
                return False
    return True
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def compute_unified_diff(filename, content2, **kwargs):
    # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
    diff = ()  # type: Iterable[str]
    exit_code = ERROR
    kw = kwargs.copy()
    if 'n' not in kwargs:
        # zero context lines
        kw['n'] = 0
    try:
        content1 = get_cached_file(filename)
        if PY3:
            c1 = unistr(content1)
            c2 = unistr(content2)
        else:
            c1 = content1
            c2 = content2
        diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
        exit_code = OK
    finally:
        return exit_code, diff

# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
configuration.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def set_value(self, key, value):
        # type: (str, Any) -> None
        """Modify a value in the configuration.
        """
        self._ensure_have_load_only()

        fname, parser = self._get_parser_to_modify()

        if parser is not None:
            section, name = _disassemble_key(key)

            # Modify the parser and the configuration
            if not parser.has_section(section):
                parser.add_section(section)
            parser.set(section, name, value)

        self._config[self.load_only][key] = value
        self._mark_as_modified(fname, parser)
game.py 文件源码 项目:Hanabi-AI 作者: MeGotsThis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def received(self, type: str, resp: Dict[str, Any]) -> None:
        if type == 'message':
            self.message_received(resp['text'])
        elif type == 'init':
            raise Exception()
        elif type == 'advanced':
            # Fast UI Animations
            pass
        elif type == 'connected':
            # Players connected to the game
            pass
        elif type == 'notify':
            self.handle_notify(resp)
        elif type == 'action':
            self.decide_action(resp['can_clue'], resp['can_discard'])
        else:
            print(type, resp)
            raise Exception()
configuration_space.py 文件源码 项目:ConfigSpace 作者: automl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __getitem__(self, item: str) -> Any:
        if self._query_values or item in self._values:
            return self._values.get(item)

        hyperparameter = self.configuration_space._hyperparameters[item]
        item_idx = self.configuration_space._hyperparameter_idx[item]

        if not np.isfinite(self._vector[item_idx]):
            raise KeyError()

        value = hyperparameter._transform(self._vector[item_idx])
        # Truncate the representation of the float to be of constant
        # length for a python version
        if isinstance(hyperparameter, FloatHyperparameter):
            value = float(repr(value))
        # TODO make everything faster, then it'll be possible to init all values
        # at the same time and use an OrderedDict instead of only a dict here to
        # support iterating that dict in the same order as the actual order of
        # hyperparameters
        self._values[item] = value
        return self._values[item]
corpus.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
        [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
        def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
            examples_by_directory = group(examples, key=key_from_example)
            directories = examples_by_directory.keys()

            # split must be the same every time:
            random.seed(42)
            keys = set(random.sample(directories, int(training_share * len(directories))))

            training_examples = [example for example in examples if key_from_example(example) in keys]
            test_examples = [example for example in examples if key_from_example(example) not in keys]

            return training_examples, test_examples

        return split
store.py 文件源码 项目:Lyra 作者: caterinaurban 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]],
                 arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())):
        """Create a mapping Var -> L from each variable in Var to the corresponding element in L.

        :param variables: list of program variables
        :param lattices: dictionary from variable types to the corresponding lattice types
        :param arguments: dictionary from variable types to arguments of the corresponding lattices
        """
        super().__init__()
        self._variables = variables
        self._lattices = lattices
        self._arguments = arguments
        try:
            self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables}
        except KeyError as key:
            error = f"Missing lattice for variable type {repr(key.args[0])}!"
            raise ValueError(error)
rendezvous.py 文件源码 项目:proxenos 作者: darvid 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def hashex(method,    # type: HashMethod
           key,       # type: KeyType
           **options  # type: typing.Any
           ):
    # type: (...) -> int
    if isinstance(key, six.text_type):
        key = key.encode('utf-8')
    if method.name.lower() in hashlib.algorithms_guaranteed:
        return int(hashlib.new(method.name.lower(), key).hexdigest(), 16)
    elif method == HashMethod.MMH3_32:
        return hash_murmur3(key, size=32, **options)
    elif method == HashMethod.MMH3_64:
        return hash_murmur3(key, size=64, **options)
    elif method == HashMethod.MMH3_128:
        return hash_murmur3(key, size=128, **options)
    elif method == HashMethod.SIPHASH:
        return hash_siphash(key, **options)
sources.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
        dispatcher = singledispatch(method)
        provides = set()

        def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
            call = dispatcher.dispatch(type)
            try:
                return call(self, query, context=context)
            except TypeError:
                raise DataSource.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
            provides.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._provides = provides
        update_wrapper(wrapper, method)
        return wrapper
sinks.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
        dispatcher = singledispatch(method)
        accepts = set()

        def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None:
            call = dispatcher.dispatch(type)
            try:
                return call(self, items, context=context)
            except TypeError:
                raise DataSink.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]:
            accepts.add(type)
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._accepts = accepts
        update_wrapper(wrapper, method)
        return wrapper
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        to_type = None
        for target_type in target_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    to_type = target_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
        return best, to_type, best_cost
queries.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True:
        # This is a little weird. We have to handle the case of can_have("x").and_("y") here.
        # Since the only type of Node that can return False rather than just raising a
        # QueryValidationError is a KeyNode that isn't required, or an OrNode whose children
        # are all KeyNodes that aren't required, we can't short circuit on a False value. If
        # we have can_have("x").and_("y"), and only some are True, we want raise a
        # QueryValidationError. If all are True or all are False, everything's fine.
        all_true = True
        all_possible_false = True
        for child in self.children:
            is_true = child.evaluate(query, context)
            all_true = all_true and is_true
            if child.falsifiable:
                all_possible_false = all_possible_false and not is_true

        if all_possible_false or all_true:
            return True
        raise BoundKeyExistenceError("Query must have all or none of the elements joined with \"and\" in a \"can_have\" statement!")
transformers.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
        dispatcher = singledispatch(method)
        transforms = {}

        def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T:
            call = dispatcher.dispatch(TypePair[value.__class__, target_type])
            try:
                return call(self, value, context=context)
            except TypeError:
                raise DataTransformer.unsupported(target_type, value)

        def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]:
            try:
                target_types = transforms[from_type]
            except KeyError:
                target_types = set()
                transforms[from_type] = target_types
            target_types.add(to_type)

            return dispatcher.register(TypePair[from_type, to_type])

        wrapper.register = register
        wrapper._transforms = transforms
        update_wrapper(wrapper, method)
        return wrapper
client.py 文件源码 项目:grakn-python 作者: graknlabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execute(self, query: str, *, infer: bool = True, multi: bool = False) -> Any:
        """Execute a Graql query against the knowledge base

        :param query: the Graql query string to execute against the knowledge base
        :param infer: enable inference
        :param multi: treat this request as a list of queries
        :return: a list of query results

        :raises: GraknError, requests.exceptions.ConnectionError
        """
        response = self._post(query, infer=infer, multi=multi)

        if response.ok:
            return response.json()
        else:
            raise GraknError(response.json()['exception'])
body.py 文件源码 项目:botodesu 作者: futursolo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData:
    form_fata = aiohttp.FormData()

    for name, value in kwargs.items():
        if isinstance(value, BotoFairu):
            form_fata.add_field(
                name, value._content,
                content_type=value._content_type,
                filename=value._filename,
                content_transfer_encoding=value._content_transfer_encoding)

        elif isinstance(value, (list, dict)):
            form_fata.add_field(name, json.dumps(value))

        elif isinstance(value, (int, float, str, bool)):
            form_fata.add_field(name, str(value))

        else:
            raise TypeError("Unknown Type: {}".format(type(value)))

    return form_fata
sizing.py 文件源码 项目:mugen 作者: scherroman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float,
                                        default: Any = _sentinel) -> Union[Dimensions, Any]:
    """
    Returns
    -------
    The largest dimensions after cropping each dimensions to reach desired aspect ratio
    """
    if not dimensions_list:
        if default is not _sentinel:
            return default
        raise ValueError(f"{dimensions_list} must not be empty.")

    largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio)
    for dimensions in dimensions_list[1:]:
        nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio)

        if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution:
            largest_dimensions = nearest_dimensions_to_aspect_ratio

    return largest_dimensions
subtitles.py 文件源码 项目:mugen 作者: scherroman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None,
               durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack':
        subtitles = []
        if locations:
            start_times, end_times = loc_util.start_end_locations_from_locations(locations)
        elif durations:
            start_times, end_times = loc_util.start_end_locations_from_intervals(durations)
        else:
            raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}")

        for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)):
            subtitle = Subtitle(str(text), start_time, end_time)
            subtitles.append(subtitle)

        subtitle_track = cls(subtitles, name, default=default)

        return subtitle_track
slack.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
    ret = {
        'webhook-url': config.get('slack-webhook-url'),
        'tmpl-msg': config.get('slack-tmpl-msg'),
        'tmpl-duration': config.get('slack-tmpl-duration', fallback=''),
        'tmpl-url': config.get('slack-tmpl-url', fallback='')
    }  # type: Any
    if not ret['webhook-url'] or not ret['tmpl-msg']:
        log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS')
        ret = None
    else:
        log.debug('Valid slack notification settings found', 'NOTIFICATIONS')
        ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg'])
        if ret['tmpl-duration']:
            ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration'])
        if ret['tmpl-url']:
            ret['tmpl-url'] = jinja2.Template(ret['tmpl-url'])
    return ret
clicksend.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
    """Parse clicksend sms settings.

    Should only be called from sms.parse_settings.
    """
    ret = {
        'provider': 'clicksend',
        'username': config.get('sms-clicksend-username'),
        'api-key': config.get('sms-clicksend-api-key'),
        'sender': config.get('sms-clicksend-sender'),
        'tmpl': config.get('sms-tmpl'),
    }  # type: Any
    if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']:
        log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS')
        ret = None
    else:
        log.debug('Valid SMS notification settings found', 'NOTIFICATIONS')
        ret['tmpl'] = jinja2.Template(ret['tmpl'])
    return ret
cli.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ctrlc_signal_handler(sgn: int, frame: Any) -> None:
    sys.exit(0)
ansi.py 文件源码 项目:python-devtools 作者: samuelcolvin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __call__(self, input: Any, *styles, reset: bool=True, apply: bool=True):
        """
        Styles text with ANSI styles and returns the new string.

        By default the styling is cleared at the end of the string, this can be prevented with``reset=False``.

        Examples::

            print(sformat('Hello World!', sformat.green))
            print(sformat('ATTENTION!', sformat.bg_magenta))
            print(sformat('Some things', sformat.reverse, sformat.bold))

        :param input: the object to style with ansi codes.
        :param *styles: zero or more styles to apply to the text, should be either style instances or strings
                        matching style names.
        :param reset: if False the ansi reset code is not appended to the end of the string
        :param: apply: if False no ansi codes are applied
        """
        text = str(input)
        if not apply:
            return text
        codes = []
        for s in styles:
            # raw ints are allowed
            if not isinstance(s, self.__class__) and not isinstance(s, int):
                try:
                    s = self.styles[s]
                except KeyError:
                    raise ValueError('invalid style "{}"'.format(s))
            codes.append(str(s.value))

        if codes:
            r = _ansi_template.format(';'.join(codes)) + text
        else:
            r = text

        if reset:
            r += _ansi_template.format(self.reset)
        return r


问题


面经


文章

微信
公众号

扫码关注公众号