python类Tuple()的实例源码

main.py 文件源码 项目:flashcard 作者: sotetsuk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_diff_with_color(expected: str, ans: str) -> Tuple[str, str]:
    d = difflib.Differ()
    diff = d.compare(expected, ans)

    expected_with_mistake = ""
    ans_with_mistake = ""
    for e in diff:
        if e.startswith("+"):
            ans_with_mistake += colored(e[-1], "red")
        elif e.startswith("-"):
            expected_with_mistake += colored(e[-1], "green")
        else:
            expected_with_mistake += e[-1]
            ans_with_mistake += e[-1]

    return expected_with_mistake, ans_with_mistake
cli.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None,
               custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]:
    """
    Get a processor instance. The class and buffers will be selected based on the
    python_driver.ProcessorConfigs dictionary. The input and output buffers can
    be overriden using the custom_inbuffer and custom_outbuffer parameters. This
    is mainly useful for unittesting.
    """
    conf = ProcessorConfigs.get(format_)
    if not conf:
        raise RequestInstantiationException('No RequestProcessor found for format %s' % format_)

    inbuffer  = custom_inbuffer if custom_inbuffer else conf['inbuffer']
    outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer']
    instance  = conf['class'](outbuffer) # type: ignore

    return instance, inbuffer
methods.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def __init__(
            self,
            description: str = None,
            pre_hooks: (List, Tuple) = None,
            post_hooks: (List, Tuple) = None
    ):
        self.result = None
        self.total = None
        self.success = None
        self.errors = None
        self.params = None
        self.output = None
        self.pagination = None
        self.limit = None
        self.offset = None
        self.app = None
        self.settings = None
        self.description = description
        self.pre_hooks = pre_hooks
        self.post_hooks = post_hooks
        self.meta = {}
acceptability.py 文件源码 项目:trf 作者: aistairc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]:
        n_total_words = 0
        word_freq = {}
        with open(self.rnnlm_model_path, mode='r') as f:
            for line in f:

                n_total_words += 1

                word, freq = line.split(' ')
                freq = int(freq)
                if freq > threshold:
                    word_freq[word] = freq
                else:
                    word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1

        return (word_freq, n_total_words)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def extra_penalty(self, style, complexity):
        # type: (Style, int) -> Tuple[int, int]
        """Trying longer and longer column limits
        without getting better results should be penalized to speed
        up the search.
        """
        standards = {'ColumnLimit': 80,
                     'MaxEmptyLinesToKeep': 2, }
        penalty = 0
        for optionname, value in standards.items():
            fvalue = style.get(optionname, value)
            if fvalue is not None and fvalue > value:
                penalty += fvalue - value
        if style.get('BreakBeforeBraces') == 'Custom':
            # Rate a commonly known brace breaking style
            # better than an equally performing custom style.
            penalty += 1
            # We would prefer an equally performing style even if we had to
            # add another 12 options.
            complexity += 12
        return complexity, penalty
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        if not keys:
            return []
        cached = []
        uncached = []  # type: List[Tuple[int, Optional[bytes]]]
        contentkeys = super(DedupKeyValueStore, self).mget(keys)
        for idx, contentkey in enumerate(contentkeys):
            if contentkey is None:
                uncached.append((idx, None))
            else:
                sha = binary_type(contentkey)
                cached.append((idx, unistr(sha)))
        if not cached:
            return [None for _, contentkey in uncached]
        indices, existing_keys = zip(*cached)
        existing_values = self.kvstore.mget(existing_keys)
        idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
        return list([value for _, value in idx_value_pairs])
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def split_reffiles(references, filenames):
    # type: (bool, List[str]) -> Tuple[List[str], List[str]]
    """Splits [file1, reffile1, file2, reffile2] into [file1, file2], [reffile1, reffile2]
    when references is True.
    When references is False returns the pair (filenames, filenames).
    """
    if not references:
        return filenames, filenames
    assert len(filenames) % 2 == 0
    files = []
    refs = []
    for filename, reffilename in grouper(2, filenames):
        files.append(filename)
        refs.append(reffilename)
    return files, refs

# ----------------------------------------------------------------------
# Functions to convert ANSI text into HTML.
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def update_evaluations(formatter,  # type: CodeFormatter
                       evaluations,  # type: List[AttemptResult]
                       finished_styles,  # type: List[AttemptResult]
                       bestdist  # type: Sequence[int]
                       ):
    # type: (...) -> Tuple[bool, bool, Sequence[int]]
    attemptresult = heapq.heappop(evaluations)
    nested_round = False
    if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)):
        bestdist = attemptresult.distance
        heapq.heappush(evaluations, attemptresult)
    else:
        # We found a style that could no longer be improved by adding a single option value.
        heapq.heappush(finished_styles, attemptresult)
        nested_styles = formatter.nested_derivations(attemptresult.formatstyle)
        if not nested_styles:
            # This formatstyle does not unlock more options.
            return True, nested_round, bestdist
        # Restart the optimization from scratch with the attemptresult augmented with
        # every nested option as seed styles.
        bestdist = None
        ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE)
        evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles]
        nested_round = True
    return False, nested_round, bestdist
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def avg_linelength_diffs(diffargs):
    # type: (List[Tuple[str, bytes]]) -> Iterable[int]
    """Returns the nudged absolute line length differences.
    """
    for filename1, content2 in diffargs:
        linelen1 = get_num_lines(filename1)
        filelen1 = len(get_cached_file(filename1))
        avg1 = 0.0
        if linelen1 > 0:
            avg1 = float(filelen1) / linelen1

        linelen2 = count_content_lines(content2)
        filelen2 = len(content2)
        avg2 = 0.0
        if linelen2 > 0:
            avg2 = float(filelen2) / linelen2

        yield int(abs(10000.0 * (avg1 - avg2)))
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unified_diff(filename, content2=None):
    # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
    """This function prints a unified diff of the contents of
    filename and the standard input, when used from the command line
    as follows:
        echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
    We get this result:
    ---
    +++
    @@ -1 +1 @@
    -123
    +456
    """
    use_stdin = content2 is None
    if content2 is None:
        # Read binary input stream
        stdin = rawstream(sys.stdin)
        econtent2 = bytestr(stdin.read())
    else:
        econtent2 = content2
    exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
    if use_stdin:
        write('\n'.join(diff))
    return exit_code, diff
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def compute_unified_diff(filename, content2, **kwargs):
    # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
    diff = ()  # type: Iterable[str]
    exit_code = ERROR
    kw = kwargs.copy()
    if 'n' not in kwargs:
        # zero context lines
        kw['n'] = 0
    try:
        content1 = get_cached_file(filename)
        if PY3:
            c1 = unistr(content1)
            c2 = unistr(content2)
        else:
            c1 = content1
            c2 = content2
        diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
        exit_code = OK
    finally:
        return exit_code, diff

# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
bin.py 文件源码 项目:cxflow-tensorflow 作者: Cognexa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def bin_stats(predictions: tf.Tensor, labels: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
    """
    Calculate f1, precision and recall from binary classification expected and predicted values.

    :param predictions: 2-d tensor (batch, predictions) of predicted 0/1 classes
    :param labels: 2-d tensor (batch, labels) of expected 0/1 classes
    :return: a tuple of batched (f1, precision and recall) values
    """
    predictions = tf.cast(predictions, tf.int32)
    labels = tf.cast(labels, tf.int32)

    true_positives = tf.reduce_sum((predictions * labels), axis=1)
    false_positives = tf.reduce_sum(tf.cast(tf.greater(predictions, labels), tf.int32), axis=1)
    false_negatives = tf.reduce_sum(tf.cast(tf.greater(labels, predictions), tf.int32), axis=1)

    recall = true_positives / (true_positives + false_negatives)
    precision = true_positives / (true_positives + false_positives)
    f1_score = 2 / (1 / precision + 1 / recall)

    return f1_score, precision, recall
configuration_space.py 文件源码 项目:ConfigSpace 作者: automl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _check_edges(self, edges: List[Tuple[str, str]]) -> None:
        for parent_node, child_node in edges:
            # check if both nodes are already inserted into the graph
            if child_node not in self._hyperparameters:
                raise ValueError("Child hyperparameter '%s' not in configuration "
                                 "space." % child_node)
            if parent_node not in self._hyperparameters:
                raise ValueError("Parent hyperparameter '%s' not in configuration "
                                 "space." % parent_node)

        # TODO: recursively check everything which is inside the conditions,
        # this means we have to recursively traverse the condition

        tmp_dag = self._create_tmp_dag()
        for parent_node, child_node in edges:
            tmp_dag.add_edge(parent_node, child_node)

        if not ConfigSpace.nx.is_directed_acyclic_graph(tmp_dag):
            cycles = list(ConfigSpace.nx.simple_cycles(tmp_dag))  # type: List[List[str]]
            for cycle in cycles:
                cycle.sort()
            cycles.sort()
            raise ValueError("Hyperparameter configuration contains a "
                             "cycle %s" % str(cycles))
corpus.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
        [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
        def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
            examples_by_directory = group(examples, key=key_from_example)
            directories = examples_by_directory.keys()

            # split must be the same every time:
            random.seed(42)
            keys = set(random.sample(directories, int(training_share * len(directories))))

            training_examples = [example for example in examples if key_from_example(example) in keys]
            test_examples = [example for example in examples if key_from_example(example) not in keys]

            return training_examples, test_examples

        return split
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]:
        try:
            LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
            path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost")
            LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        except (KeyError, NetworkXNoPath):
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type))

        LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))
        chain = []
        cost = 0
        for source, target in _pairwise(path):
            transformer = self._type_graph.adj[source][target][_TRANSFORMER]
            chain.append((transformer, target))
            cost += transformer.cost
        LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__))

        if not chain:
            return _identity, 0

        return partial(_transform, transformer_chain=chain), cost
pipelines.py 文件源码 项目:datapipelines-python 作者: meraki-analytics 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]:
        best = None
        best_cost = _MAX_TRANSFORM_COST
        to_type = None
        for target_type in target_types:
            try:
                transform, cost = self._transform(source_type, target_type)
                if cost < best_cost:
                    best = transform
                    best_cost = cost
                    to_type = target_type
            except NoConversionError:
                pass
        if best is None:
            raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types))
        return best, to_type, best_cost
records.py 文件源码 项目:python-netsgiro 作者: otovo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _split_text_to_lines_and_columns(
            cls, text) -> Iterable[Tuple[int, int, str]]:
        lines = text.splitlines()

        if len(lines) > cls._MAX_LINES:
            raise ValueError(
                'Max {} specification lines allowed, got {}'
                .format(cls._MAX_LINES, len(lines)))

        for line_number, line_text in enumerate(lines, 1):
            if len(line_text) > cls._MAX_LINE_LENGTH:
                raise ValueError(
                    'Specification lines must be max {} chars long, '
                    'got {}: {!r}'
                    .format(cls._MAX_LINE_LENGTH, len(line_text), line_text))

            yield (line_number, 1, '{:40}'.format(line_text[0:40]))
            yield (line_number, 2, '{:40}'.format(line_text[40:80]))
powers.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _encrypt_key(
            self,
            key: bytes,
            pubkey: bytes = None
    ) -> Tuple[bytes, bytes]:
        """
        Encrypts the `key` provided for the provided `pubkey` using the ECIES
        schema. If no `pubkey` is provided, it uses `self.pub_key`.

        :param key: Key to encrypt
        :param pubkey: Public Key to encrypt the `key` for

        :return (encrypted key, encapsulated ECIES key)
        """
        pubkey = pubkey or self.pub_key

        symm_key, enc_symm_key = API.ecies_encaspulate(pubkey)
        enc_key = API.symm_encrypt(symm_key, key)
        return (enc_key, enc_symm_key)
powers.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def gen_path_keys(
            self,
            path: bytes
    ) -> List[Tuple[bytes, bytes]]:
        """
        Generates path keys and returns path keys

        :param path: Path to derive key(s) from

        :return: List of path keys
        """
        subpaths = self._split_path(path)
        keys = []
        for subpath in subpaths:
            path_priv, path_pub = self._derive_path_key(subpath)
            keys.append((path_priv, path_pub))
        return keys
powers.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def encrypt(
            self,
            data: bytes,
            pubkey: bytes = None
    ) -> Tuple[bytes, bytes]:
        """
        Encrypts data with Public key encryption

        :param data: Data to encrypt
        :param pubkey: publc key to encrypt for

        :return: (Encrypted Key, Encrypted data)
        """
        pubkey = pubkey or self.pub_key

        key, enc_key = API.ecies_encapsulate(pubkey)
        enc_data = API.symm_encrypt(key, data)

        return (enc_data, API.elliptic_curve.serialize(enc_key.ekey))
powers.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def decrypt(
            self,
            enc_data: Tuple[bytes, bytes],
            privkey: bytes = None
    ) -> bytes:
        """
        Decrypts data using ECIES PKE. If no `privkey` is provided, it uses
        `self.priv_key`.

        :param enc_data: Tuple: (encrypted data, ECIES encapsulated key)
        :param privkey: Private key to decapsulate with

        :return: Decrypted data
        """
        privkey = privkey or self.priv_key
        ciphertext, enc_key = enc_data

        enc_key = API.elliptic_curve.deserialize(API.PRE.ecgroup, enc_key)
        enc_key = API.umbral.EncryptedKey(ekey=enc_key, re_id=None)

        dec_key = API.ecies_decapsulate(privkey, enc_key)

        return API.symm_decrypt(dec_key, ciphertext)
_internal.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _ecies_gen_ephemeral_key(
        recp_pubkey: Union[bytes, elliptic_curve.ec_element]
) -> Tuple[bytes, Tuple[bytes, bytes]]:
    """
    Generates and encrypts an ephemeral key for the `recp_pubkey`.

    :param recp_pubkey: Recipient's pubkey

    :return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric
             key, and encrypted ephemeral privkey
    """
    symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey)
    eph_privkey = API.ecies_gen_priv()

    enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey)
    return (eph_privkey, (enc_symm_key, enc_eph_privkey))
api.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def ecdsa_priv2pub(
        privkey: bytes,
        to_bytes: bool = True
) -> Union[bytes, Tuple[int]]:
    """
    Returns the public component of an ECDSA private key.

    :param privkey: Private key as an int or bytestring
    :param to_bytes: Serialize to bytes or not?

    :return: Byte encoded or Tuple[int] ECDSA pubkey
    """
    pubkey = privtopub(privkey)
    if to_bytes:
        return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey)
    return pubkey
api.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def ecdsa_verify(
        v: int,
        r: int,
        s: int,
        msghash: bytes,
        pubkey: Union[bytes, Tuple[int, int]]
) -> bool:
    """
    Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA.

    :param v: V of sig
    :param r: R of sig
    :param s: S of sig
    :param bytes msghash: The hashed message to verify
    :param bytes pubkey: Pubkey to validate signature for

    :rtype: Boolean
    :return: Is the signature valid or not?
    """
    if bytes == type(pubkey):
        pubkey = ecdsa_bytes2pub(pubkey)

    verify_sig = ecdsa_raw_recover(msghash, (v, r, s))
    # TODO: Should this equality test be done better?
    return verify_sig == pubkey
api.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ecies_ephemeral_split_rekey(
        privkey_a: Union[bytes, elliptic_curve.ec_element],
        pubkey_b: Union[bytes, elliptic_curve.ec_element],
        min_shares: int,
        total_shares: int
) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]:
    """
    Performs a split-key re-encryption key generation where a minimum
    number of shares `min_shares` are required to reproduce a rekey.
    Will split a rekey inot `total_shares`.
    This also generates an ephemeral keypair for the recipient as `pubkey_b`.

    :param privkey_a: Privkey to re-encrypt from
    :param pubkey_b: Public key to re-encrypt for (w/ ephemeral key)
    :param min_shares: Minium shares needed to reproduce a rekey
    :param total_shares: Total shares to generate from split-rekey gen

    :return: A tuple containing a list of rekey frags, and a tuple of the
             encrypted ephemeral key data (enc_symm_key, enc_eph_privkey)
    """
    eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b)
    kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares)
    pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message)

    return (kfrags, pfrag)
encrypting_keypair.py 文件源码 项目:nucypher-kms 作者: nucypher 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def encrypt(self,
                data: bytes,
                pubkey: bytes = None) -> Tuple[bytes, bytes]:
        """
        :data:      The data to encrypt. If derived per-subpath, it's a
                    symmetric key to use for block ciphers.
        :pubkey:    Optional public key to encrypt for. If not given, encrypt
                    for ours

        :returns:   (ekey, edata) where ekey is needed for recepient to
                    reconstruct a DH secret, edata is data encrypted with this
                    DH secret. The output should be treated as a monolithic
                    ciphertext outside of this class
        """
        if pubkey is None:
            pubkey = self._pub_key
        else:
            pubkey = ec.deserialize(self.pre.ecgroup, pubkey)

        key, ekey = self.pre.encapsulate(pubkey)
        cipher = SecretBox(key)

        return ((ec.serialize(ekey.ekey), None),
                cipher.encrypt(data))
Segment.py 文件源码 项目:mugen 作者: scherroman 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def crop_scale(self, dimensions: Tuple[int, int]) -> 'Segment':
        """
        Returns
        -------
        A new Segment, cropped and/or scaled as necessary to reach specified dimensions
        """
        segment = self.copy()
        dimensions = Dimensions(*dimensions)

        if segment.aspect_ratio != dimensions.aspect_ratio:
            # Crop segment to match aspect ratio
            segment = segment.crop_to_aspect_ratio(dimensions.aspect_ratio)

        if segment.dimensions != dimensions:
            # Resize segment to reach final dimensions
            segment = segment.resize(dimensions)

        return segment
ExternalService.py 文件源码 项目:kudubot 作者: namboy94 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def store_message_in_file(self, message: Message) -> Tuple[str, str]:
        """
        Stores a message in a json file.
        The filename of the file will be the current time.
        Also generates a response file location in which the executable may
        write a response into

        :param message: The message to save
        :return: The location of the stored message json file,
                 the location of the response file
        """

        json_data = message.to_dict()

        while True:  # Make sure that file does not exist
            message_file = os.path.join(self.message_dir, str(time.time()))
            if not os.path.isfile(message_file):
                with open(message_file + ".json", 'w') as json_file:
                    json.dump(json_data, json_file)
                return message_file + ".json", message_file + "-response.json"

    # noinspection PyMethodMayBeStatic
nagios.py 文件源码 项目:irisett 作者: beebyte 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run_plugin(executable: str, args: List[str], timeout: int) -> Tuple[str, List[str]]:
    run_args = [executable] + args
    try:
        proc = await asyncio.create_subprocess_exec(
            *run_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
    except FileNotFoundError:
        raise NagiosError('executable not found')
    stdin_data, stderr_data = await proc.communicate()
    std_data = stdin_data + stderr_data
    await proc.wait()
    if proc.returncode not in [STATUS_OK, STATUS_WARNING, STATUS_CRITICAL]:
        raise MonitorFailedError(std_data)
    text, perf = parse_plugin_output(std_data)
    if proc.returncode not in [STATUS_OK, STATUS_WARNING]:
        raise MonitorFailedError(text)
    return text, perf
test_ReportWrapper.py 文件源码 项目:AutoTriageBot 作者: salesforce 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def callAllMethods(obj: object) -> List[Tuple[str, Any]]:
    results = []  # type: List[Tuple[str, Any]]
    for method in dir(obj):
        if method == '__hash__':
            continue
        if callable(getattr(obj, method)):
            try:
                res = getattr(obj, method)()
                if isinstance(res, bool) or isinstance(res, int):
                    results.append((method, res))
                if isinstance(res, str):
                    # Ignore anything with 0x in it since memory addresses change
                    if '0x' not in res:
                        results.append((method, res))
            except:
                if '0x' not in method:
                    results.append(('except', method))
    return results


问题


面经


文章

微信
公众号

扫码关注公众号