python类StringIO()的实例源码

float_ew2.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, name, retain_file=False, gen_flex=False):
        self.num_kernels = 0
        self.module = None
        self.functions = dict()
        self.arg_descs = dict()
        self.cache = dict()

        self.compiled = False
        self.retain_file = retain_file

        self.buffer = StringIO()
        # Open file and add header
        if self.retain_file:
            self.f = tempfile.NamedTemporaryFile(mode='w', suffix='.c', prefix=name, delete=False)
            self.filename = self.f.name

        self.buffer.write(_includes_template)
        if gen_flex:
            self.buffer.write(_flex_includes_template)
test_parser.py 文件源码 项目:deb-python-lesscpy 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_parse_stream(self):
        """
        It can parse input from a file stream.
        """
        stream = StringIO("""
            @nice-blue: #5B83AD;
            """)

        self.parser.parse(file=stream)

        # A single object is parser which is the expected variable.
        self.assertEqual(1, len(self.parser.result))
        # This is a stream without a name so it sets default name.
        self.assertEqual('(stream)', self.parser.target)
        variable = self.parser.result[0]
        self.assertEqual('@nice-blue', variable.name)
        self.assertEqual(['#5b83ad'], variable.value)
test_ec_key.py 文件源码 项目:libtrust-py 作者: realityone 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_sign(self):
        message = six.StringIO(u'Hello, World!')
        sig_algs = (hash_.ES256, hash_.ES384, hash_.ES521)
        origin_sig = (
            [88, 34, 78, 248, 120, 105, 162, 172, 14, 179, 252, 201, 193, 230, 124, 105, 227, 145, 236, 104, 31, 153, 215, 117,
             193, 126, 229, 98, 143, 88, 81, 216, 19, 67, 86, 127, 212, 195, 149, 200, 63, 117, 65, 155, 70, 6, 219, 234, 151,
             120, 229, 214, 193, 210, 165, 158, 135, 160, 244, 210, 41, 49, 132, 71],
            [65, 50, 99, 81, 100, 105, 169, 194, 75, 54, 38, 34, 69, 117, 22, 1, 105, 176, 138, 47, 254, 233, 225, 132, 39, 40,
             160, 2, 9, 208, 78, 11, 76, 20, 163, 57, 222, 176, 108, 93, 155, 163, 102, 185, 211, 138, 83, 92, 67, 92, 133, 51,
             119, 58, 20, 212, 51, 37, 20, 175, 202, 134, 85, 14],
            [112, 133, 177, 239, 79, 252, 240, 252, 28, 96, 174, 40, 109, 34, 109, 141, 183, 16, 246, 249, 102, 213, 128, 132,
             206, 148, 124, 150, 254, 248, 164, 62, 109, 95, 178, 120, 53, 52, 216, 135, 213, 193, 97, 109, 255, 0, 220, 219, 41,
             224, 112, 236, 14, 131, 170, 191, 223, 234, 36, 90, 152, 133, 138, 247]
        )

        for i, sa in enumerate(sig_algs):
            message.seek(0)
            sig, alg = self.private_key.sign(message, sa.hash_id)

            message.seek(0)
            self.assertTrue(self.public_key.verify(message, alg, sig))

            message.seek(0)
            self.assertTrue(self.public_key.verify(message, alg, bytes(bytearray(origin_sig[i]))))
jsonsign.py 文件源码 项目:libtrust-py 作者: realityone 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def sign(self, private_key, timestamp=None):
        protected = self.protected_header(timestamp=timestamp)
        sign_bytes = six.StringIO(self.sign_bytes(protected).decode())

        sig_bytes, algorithm = private_key.sign(sign_bytes, hash_.HashID.SHA256)
        self.signatures.append(
            JsSignature(
                JsHeader(
                    private_key.public_key(),
                    algorithm
                ),
                util.jose_base64_url_encode(sig_bytes),
                protected
            )
        )
        return sig_bytes, algorithm
jsonsign.py 文件源码 项目:libtrust-py 作者: realityone 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def verify(self):
        keys = []
        for sign in self.signatures:
            sign_bytes = self.sign_bytes(sign.protected)
            if sign.header.chain:
                raise NotImplementedError()
            elif sign.header.jwk:
                public_key = sign.header.jwk
            else:
                raise JSONSignError("missing public key")

            sig_bytes = util.jose_base64_url_decode(sign.signature)

            try:
                public_key.verify(six.StringIO(sign_bytes.decode()), sign.header.algorithm, sig_bytes)
            except Exception as e:
                raise e

            keys.append(public_key)

        return keys
base.py 文件源码 项目:AutoML-Challenge 作者: postech-mlg-exbrain 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __repr__(self, verbosity=0):
        repr = StringIO()
        repr.write("Metafeatures for dataset %s\n" % self.name)
        for name in self.values:
            if verbosity == 0 and self.values[name].type_ != "METAFEATURE":
                continue
            if verbosity == 0:
                repr.write("  %s: %s\n" %
                           (str(name), str(self.values[name].value)))
            elif verbosity >= 1:
                repr.write("  %s: %10s  (%10fs)\n" %
                           (str(name), str(self.values[
                                               name].value)[:10],
                            self.values[name].time))

            # Add the reason for a crash if one happened!
            if verbosity > 1 and self.values[name].comment:
                repr.write("    %s\n" % self.values[name].comment)

        return repr.getvalue()
frozen_lake.py 文件源码 项目:cs234 作者: CalciferZh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _render(self, mode='human', close=False):
        if close:
            return
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write("  ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        return outfile
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_run_invalid(self, *args):
        m_fin = StringIO()
        m_fout = StringIO()
        code = self.runner.run(
            {'CNI_COMMAND': 'INVALID', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)

        self.assertEqual(1, code)
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_run_write_version(self, *args):
        m_fin = StringIO()
        m_fout = StringIO()
        code = self.runner.run(
            {'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
        result = jsonutils.loads(m_fout.getvalue())

        self.assertEqual(0, code)
        self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS,
                         result['supportedVersions'])
        self.assertEqual(api.CNIRunner.VERSION, result['cniVersion'])
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_run_del(self, m_k8s_delete):
        vif = fake._fake_vif()
        m_k8s_delete.return_value = vif
        m_fin = StringIO()
        m_fout = StringIO()
        env = {
            'CNI_COMMAND': 'DEL',
            'CNI_ARGS': 'foo=bar',
        }
        self.runner.run(env, m_fin, m_fout)
        self.assertTrue(m_k8s_delete.called)
        self.assertEqual('foo=bar', m_k8s_delete.call_args[0][0].CNI_ARGS)
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _test_run(self, cni_cmd, path, m_post):
        m_fin = StringIO()
        m_fout = StringIO()
        env = {
            'CNI_COMMAND': cni_cmd,
            'CNI_ARGS': 'foo=bar',
        }
        result = self.runner.run(env, m_fin, m_fout)
        m_post.assert_called_with(
            'http://127.0.0.1:%d/%s' % (self.port, path),
            json=mock.ANY, headers={'Connection': 'close'})
        return result
call.py 文件源码 项目:run_lambda 作者: ethantkoenig 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, context):
            self._context = context

            self._start_mem = memory_profiler.memory_usage()[0]

            self._log = StringIO()
            self._log.write("START RequestId: {r} Version: {v}\n".format(
                r=context.aws_request_id, v=context.function_version
            ))
            self._start_time = timeit.default_timer()
            self._previous_stdout = sys.stdout

            handler = logging.StreamHandler(stream=self._log)
            logging.getLogger().addHandler(handler)
            sys.stdout = self._log
test_cli.py 文件源码 项目:run_lambda 作者: ethantkoenig 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def call(arguments):
        output = six.StringIO()
        with mock.patch("sys.stdout", output):
            with mock.patch("sys.argv", arguments):
                main.main()
        return str(output.getvalue())
conftest.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def log_stream():
    stream = StringIO()
    handler = logging.StreamHandler(stream)
    logger = logging.getLogger('test_logger')
    logger.setLevel(logging.WARNING)
    logger.addHandler(handler)
    yield stream, logger
    handler.close()
test_legacy.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def log_stream():
    stream = StringIO()
    handler = logging.StreamHandler(stream)
    logger = logging.getLogger('test_logger')
    logger.setLevel(logging.WARNING)
    logger.addHandler(handler)
    prior_logger = v1.logger
    v1.logger = logger
    yield stream
    v1.logger = prior_logger
    handler.close()


# test the retry logic with a custom logger
connection.py 文件源码 项目:umapi-client.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_auth(self, ims_host, ims_endpoint_jwt,
                  tech_acct_id=None, api_key=None, client_secret=None,
                  private_key_file=None, private_key_data=None,
                  **kwargs):
        tech_acct_id = tech_acct_id or kwargs.get("tech_acct")
        private_key_file = private_key_file or kwargs.get("priv_key_path")
        if not (tech_acct_id and api_key and client_secret and (private_key_data or private_key_file)):
            raise ArgumentError("Connector create: not all required auth parameters were supplied; please see docs")
        if private_key_data:
            jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, six.StringIO(private_key_data))
        else:
            with open(private_key_file, 'r') as private_key_stream:
                jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, private_key_stream)
        token = AccessRequest("https://" + ims_host + ims_endpoint_jwt, api_key, client_secret, jwt())
        return Auth(api_key, token())
detect_type_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _run_test(self, filename):
    mock_pipeline = test_helper.get_mock_pipeline([
        helper.DOCUMENT, helper.PICTURE,
        helper.ARCHIVE, helper.DISKIMAGE])

    detector = detect_type.Subscriber(mock_pipeline)

    with open('config.yml') as inp:
      detector.setup(yaml.load(inp.read()))

    doc = document.get_document(filename)

    detector.consume(doc, StringIO('dummy'))

    return doc
_pure_python_crypt.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def from_string(cls, key, password='notasecret'):
        """Construct an RsaSigner instance from a string.

        Args:
            key: string, private key in PEM format.
            password: string, password for private key file. Unused for PEM
                      files.

        Returns:
            RsaSigner instance.

        Raises:
            ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in
            PEM format.
        """
        key = _from_bytes(key)  # pem expects str in Py3
        marker_id, key_bytes = pem.readPemBlocksFromFile(
            six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER)

        if marker_id == 0:
            pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes,
                                                 format='DER')
        elif marker_id == 1:
            key_info, remaining = decoder.decode(
                key_bytes, asn1Spec=_PKCS8_SPEC)
            if remaining != b'':
                raise ValueError('Unused bytes', remaining)
            pkey_info = key_info.getComponentByName('privateKey')
            pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(),
                                                 format='DER')
        else:
            raise ValueError('No key could be detected.')

        return cls(pkey)
test_feature_conversion.py 文件源码 项目:goodbye-genbank 作者: biosustain 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_genbank_record_fixing(self):
        with open(os.path.join(FILES_PATH, 'sample.gb')) as f:
            first_record = next(SeqIO.parse(f, 'genbank'))

        first_record.features = [unconvert_feature(convert_feature(f)) for f in first_record.features]

        output = StringIO()
        SeqIO.write(first_record, output, "genbank")
        with open(os.path.join(FILES_PATH, 'sample.fixed.gb')) as f:
            self.assertEqual(output.getvalue(), f.read())
        output.close()


问题


面经


文章

微信
公众号

扫码关注公众号