python类StringIO()的实例源码

__init__.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
stage_parser.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def render(file_name):
        """
        This function makes use of slsutil salt module to render sls files
        Args:
            file_name (str): the sls file path
        """
        err = StringIO.StringIO()
        out = StringIO.StringIO()
        exception = None
        with redirect_stderr(err):
            with redirect_stdout(out):
                try:
                    result = SLSRenderer.caller.cmd('slsutil.renderer', file_name)
                except salt.exceptions.SaltException as ex:
                    exception = StageRenderingException(file_name, ex.strerror)

        if exception:
            # pylint: disable=E0702
            raise exception

        logger.info("Rendered SLS file %s, stdout\n%s", file_name, out.getvalue())
        logger.debug("Rendered SLS file %s, stderr\n%s", file_name, err.getvalue())
        return result, out.getvalue(), err.getvalue()
png.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def topngbytes(name, rows, x, y, **k):
    """Convenience function for creating a PNG file "in memory" as a
    string.  Creates a :class:`Writer` instance using the keyword arguments,
    then passes `rows` to its :meth:`Writer.write` method.  The resulting
    PNG file is returned as a string.  `name` is used to identify the file for
    debugging.
    """

    import os

    print (name)
    f = BytesIO()
    w = Writer(x, y, **k)
    w.write(f, rows)
    if os.environ.get('PYPNG_TEST_TMP'):
        w = open(name, 'wb')
        w.write(f.getvalue())
        w.close()
    return f.getvalue()
png.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testPtrns(self):
        "Test colour type 3 and tRNS chunk (and 4-bit palette)."
        a = (50,99,50,50)
        b = (200,120,120,80)
        c = (255,255,255)
        d = (200,120,120)
        e = (50,99,50)
        w = Writer(3, 3, bitdepth=4, palette=[a,b,c,d,e])
        f = BytesIO()
        w.write_array(f, array('B', (4, 3, 2, 3, 2, 0, 2, 0, 1)))
        r = Reader(bytes=f.getvalue())
        x,y,pixels,meta = r.asRGBA8()
        self.assertEqual(x, 3)
        self.assertEqual(y, 3)
        c = c+(255,)
        d = d+(255,)
        e = e+(255,)
        boxed = [(e,d,c),(d,c,a),(c,a,b)]
        flat = map(lambda row: itertools.chain(*row), boxed)
        self.assertEqual(map(list, pixels), map(list, flat))
png.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testPAMin(self):
        """Test that the command line tool can read PAM file."""
        def do():
            return _main(['testPAMin'])
        s = BytesIO()
        s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n'
                'TUPLTYPE RGB_ALPHA\nENDHDR\n'))
        # The pixels in flat row flat pixel format
        flat =  [255,0,0,255, 0,255,0,120, 0,0,255,30]
        asbytes = seqtobytes(flat)
        s.write(asbytes)
        s.flush()
        s.seek(0)
        o = BytesIO()
        testWithIO(s, o, do)
        r = Reader(bytes=o.getvalue())
        x,y,pixels,meta = r.read()
        self.assertTrue(r.alpha)
        self.assertTrue(not r.greyscale)
        self.assertEqual(list(itertools.chain(*pixels)), flat)
recipe-573471.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fetch_quote(symbols, timestamp, cached_file=None):
    url = URL % '+'.join(symbols)

    if not cached_file:
        # fetch
        log('Fetching %s' % url)
        fp = urllib.urlopen(url)
        try:
            data = fp.read()
        finally:
            fp.close()

        # log result
        if LOG_DATA_FETCHED:
            log_filename = LOG_FILENAME % timestamp.replace(':','-')
            out = open(log_filename, 'wb')
            try:
                log('Fetched %s bytes logged in %s' % (len(data), log_filename))
                out.write(data)
            finally:
                out.close()
    else:
        data = open(cached_file,'rb').read()

    return StringIO(data)
InstagramRegistration.py 文件源码 项目:Instagram-API 作者: danleyb2 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def request(self, endpoint, post=None):
        buffer = BytesIO()

        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        ch.setopt(pycurl.VERBOSE, False)
        ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
        ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))

        if post is not None:
            ch.setopt(pycurl.POST, True)
            ch.setopt(pycurl.POSTFIELDS, post)

        if self.proxy:
            ch.setopt(pycurl.PROXY, self.proxyHost)
            if self.proxyAuth:
                ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]

        ch.close()

        if self.debug:
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print("DATA: " + str(post))
            print("RESPONSE: " + body)

        return [header, json_decode(body)]
test_treewalkers.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run_test(innerHTML, input, expected, errors, treeClass):
    try:
        p = html5parser.HTMLParser(tree = treeClass["builder"])
        if innerHTML:
            document = p.parseFragment(StringIO.StringIO(input), innerHTML)
        else:
            document = p.parse(StringIO.StringIO(input))
    except constants.DataLossWarning:
        #Ignore testcases we know we don't pass
        return

    document = treeClass.get("adapter", lambda x: x)(document)
    try:
        output = convertTokens(treeClass["walker"](document))
        output = attrlist.sub(sortattrs, output)
        expected = attrlist.sub(sortattrs, convertExpected(expected))
        assert expected == output, "\n".join([
                "", "Input:", input,
                "", "Expected:", expected,
                "", "Received:", output
                ])
    except NotImplementedError:
        pass # Amnesty for those that confess...
ml.py 文件源码 项目:kaggle-spark-ml 作者: imgoodman 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def loadRecord(line):
    """
    ????csv??
    """
    input_line=StringIO.StringIO(line)
    #row=unicodecsv.reader(input_line, encoding="utf-8")
    #return row.next()
    #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
    reader=csv.reader(input_line)
    return reader.next()
    #data=[]
    #for row in reader:
    #    print row
    #    data.append([unicode(cell,"utf-8") for cell in row])
    #return data[0]
    #return reader.next()

#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
_lxml.py 文件源码 项目:TACTIC-Handler 作者: listyque 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def feed(self, markup):
        if isinstance(markup, bytes):
            markup = BytesIO(markup)
        elif isinstance(markup, unicode):
            markup = StringIO(markup)

        # Call feed() at least once, even if the markup is empty,
        # or the parser won't be initialized.
        data = markup.read(self.CHUNK_SIZE)
        try:
            self.parser = self.parser_for(self.soup.original_encoding)
            self.parser.feed(data)
            while len(data) != 0:
                # Now call feed() on the rest of the data, chunk by chunk.
                data = markup.read(self.CHUNK_SIZE)
                if len(data) != 0:
                    self.parser.feed(data)
            self.parser.close()
        except (UnicodeDecodeError, LookupError, etree.ParserError), e:
            raise ParserRejectedMarkup(str(e))
tuling.py 文件源码 项目:QQBot 作者: springhack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def CurlPOST(url, data, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
    # c.setopt(pycurl.TIMEOUT, 10)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.setopt(pycurl.POSTFIELDS, data)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
test_wheelfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
http.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = StringIO.StringIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
__init__.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def emit(events, stream=None, Dumper=Dumper,
        canonical=None, indent=None, width=None,
        allow_unicode=None, line_break=None):
    """
    Emit YAML parsing events into a stream.
    If stream is None, return the produced string instead.
    """
    getvalue = None
    if stream is None:
        from StringIO import StringIO
        stream = StringIO()
        getvalue = stream.getvalue
    dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
            allow_unicode=allow_unicode, line_break=line_break)
    try:
        for event in events:
            dumper.emit(event)
    finally:
        dumper.dispose()
    if getvalue:
        return getvalue()
test_wheelfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
test_wheelfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
test_wheelfile.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
test_decode.py 文件源码 项目:pykit 作者: baishancloud 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_object_pairs_hook(self):
        s = '{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}'
        p = [("xkd", 1), ("kcw", 2), ("art", 3), ("hxm", 4),
             ("qrt", 5), ("pad", 6), ("hoy", 7)]
        self.assertEqual(self.loads(s), eval(s))
        self.assertEqual(self.loads(s, object_pairs_hook=lambda x: x), p)
        self.assertEqual(self.json.load(StringIO(s),
                                        object_pairs_hook=lambda x: x), p)
        od = self.loads(s, object_pairs_hook=OrderedDict)
        self.assertEqual(od, OrderedDict(p))
        self.assertEqual(type(od), OrderedDict)
        # the object_pairs_hook takes priority over the object_hook
        self.assertEqual(self.loads(s, object_pairs_hook=OrderedDict,
                                    object_hook=lambda x: None),
                         OrderedDict(p))
        # check that empty object literals work (see #17368)
        self.assertEqual(self.loads('{}', object_pairs_hook=OrderedDict),
                         OrderedDict())
        self.assertEqual(self.loads('{"empty": {}}',
                                    object_pairs_hook=OrderedDict),
                         OrderedDict([('empty', OrderedDict())]))


问题


面经


文章

微信
公众号

扫码关注公众号