python类StringIO()的实例源码

checker.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_iam_credential_report(self):
        report = None
        while report == None:
            try:
                report = self.iam_client.get_credential_report()
            except botocore.exceptions.ClientError as e:
                if 'ReportNotPresent' in e.message:
                    self.iam_client.generate_credential_report()
                else:
                    raise e
                time.sleep(5)
        document = StringIO.StringIO(report['Content'])
        reader = csv.DictReader(document)
        report_rows = []
        for row in reader:
            report_rows.append(row)
        return report_rows
virustotal_api.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def scan_file(self, this_file):
        """ Submit a file to be scanned by VirusTotal

        :param this_file: File to be scanned (32MB file size limit)
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        try:
            if type(this_file) == str and os.path.isfile(this_file):
                files = {'file': (this_file, open(this_file, 'rb'))}
            elif isinstance(this_file, StringIO.StringIO):
                files = {'file': this_file.read()}
            else:
                files = {'file': this_file}
        except TypeError as e:
            return dict(error=e.message)

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def encode_public_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.public_pem())
            except TypeError:
                # It wasn't initialized yet
                return None

        return b64encode(fileobj.getvalue())
test_wheelfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
test_logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_validation_fails_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(INVALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            with assert_raises(t.ValidationError) as e:

                    call_action(
                        'resource_create',
                        package_id=dataset['id'],
                        format='CSV',
                        upload=mock_upload
                    )

        assert 'validation' in e.exception.error_dict
        assert 'missing-value' in str(e.exception)
        assert 'Row 2 has a missing value in column 4' in str(e.exception)
test_logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_validation_passes_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(VALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            resource = call_action(
                'resource_create',
                package_id=dataset['id'],
                format='CSV',
                upload=mock_upload
            )

        assert_equals(resource['validation_status'], 'success')
        assert 'validation_timestamp' in resource
test_logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_schema_upload_field(self, mock_open):

        schema_file = StringIO.StringIO('{"fields":[{"name":"category"}]}')

        mock_upload = MockFieldStorage(schema_file, 'schema.json')

        dataset = factories.Dataset()

        resource = call_action(
            'resource_create',
            package_id=dataset['id'],
            url='http://example.com/file.csv',
            schema_upload=mock_upload
        )

        assert_equals(resource['schema'], {'fields': [{'name': 'category'}]})

        assert 'schema_upload' not in resource
        assert 'schema_url' not in resource
test_13_RedhawkModule.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_apiHostCollocation(self):
        app = self._rhDom.createApplication("/waveforms/through_w/through_w.sad.xml")
        provides_ports = object.__getattribute__(app,'_providesPortDict')
        self.assertEquals(provides_ports, {})
        uses_ports = object.__getattribute__(app,'_usesPortDict')
        self.assertEquals(uses_ports, {})
        _destfile=StringIO.StringIO()
        app.api(destfile=_destfile)
        provides_ports = object.__getattribute__(app,'_providesPortDict')
        self.assertEquals(len(provides_ports), 1)
        self.assertEquals(provides_ports.keys()[0], 'input')
        self.assertEquals(provides_ports['input']['Port Interface'], 'IDL:CF/LifeCycle:1.0')
        self.assertEquals(provides_ports['input']['Port Name'], 'input')
        uses_ports = object.__getattribute__(app,'_usesPortDict')
        self.assertEquals(uses_ports.keys()[0], 'output')
        self.assertEquals(uses_ports['output']['Port Interface'], 'IDL:CF/LifeCycle:1.0')
        self.assertEquals(uses_ports['output']['Port Name'], 'output')
sdds_analyzer.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dumpPackets(self, pkt_start=0, pkt_end=None, payload_start=0, payload_end=40, raw_payload=False, header_only=False, use_pager=True ):
        genf=self._gen_packet( self.raw_data_, pkt_start ) 
        if pkt_end == None: 
           pkt_end = self.npkts_ 
        else:
           pkt_end = pkt_end + 1
        res = StringIO()
        for i, pkt in enumerate(genf,pkt_start):
            if i < pkt_end:
                print >>res, 'Packet: ', str(i)
                print >>res, pkt.header_and_payload(payload_start, payload_end, header_only=header_only, raw=raw_payload )
            else:
                break

        if use_pager:
            _helpers.Pager( res.getvalue() )
        else:
            print res.getvalue()
spd.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'softPkg'
        rootClass = softPkg
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="softPkg",
##         namespacedef_='')
    return rootObj
dpd.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'devicepkg'
        rootClass = devicepkg
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="devicepkg",
##         namespacedef_='')
    return rootObj
dcd.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'deviceconfiguration'
        rootClass = deviceconfiguration
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="deviceconfiguration",
##         namespacedef_='')
    return rootObj
profile.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'profile'
        rootClass = profile
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="profile",
##         namespacedef_='')
    return rootObj
sad.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'softwareassembly'
        rootClass = softwareassembly
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="softwareassembly",
##         namespacedef_='')
    return rootObj
scd.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'softwarecomponent'
        rootClass = softwarecomponent
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="softwarecomponent",
##         namespacedef_='')
    return rootObj
prf.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'properties'
        rootClass = properties
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="properties",
##         namespacedef_='')
    return rootObj
properties.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def toXML(self, level=0, version="2.2.2"):
        value = None
        if self.defvalue != None:
            value = to_xmlvalue(self.defvalue, self.type_)

        simp = ossie.parsers.prf.simple(id_=self.id_, 
                                        type_=self.type_,
                                        name=self.name, 
                                        mode=self.mode,
                                        description=self.__doc__,
                                        value=value,
                                        units=self.units,
                                        action=ossie.parsers.prf.action(type_=self.action))

        for kind in self.kinds:
            simp.add_kind(ossie.parsers.prf.kind(kindtype=kind))

        xml = StringIO.StringIO()
        simp.export(xml, level, name_='simple')
        return xml.getvalue()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
check.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _check_rst_data(self, data):
        """Returns warnings when the provided data doesn't compile."""
        source_path = StringIO()
        parser = Parser()
        settings = frontend.OptionParser().get_default_values()
        settings.tab_width = 4
        settings.pep_references = None
        settings.rfc_references = None
        reporter = SilentReporter(source_path,
                          settings.report_level,
                          settings.halt_level,
                          stream=settings.warning_stream,
                          debug=settings.debug,
                          encoding=settings.error_encoding,
                          error_handler=settings.error_encoding_error_handler)

        document = nodes.document(settings, reporter, source=source_path)
        document.note_source(source_path, -1)
        try:
            parser.parse(data, document)
        except AttributeError:
            reporter.messages.append((-1, 'Could not finish the parsing.',
                                      '', {}))

        return reporter.messages
__init__.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parseString(string, handler, errorHandler=ErrorHandler()):
    try:
        from cStringIO import StringIO
    except ImportError:
        from StringIO import StringIO

    if errorHandler is None:
        errorHandler = ErrorHandler()
    parser = make_parser()
    parser.setContentHandler(handler)
    parser.setErrorHandler(errorHandler)

    inpsrc = InputSource()
    inpsrc.setByteStream(StringIO(string))
    parser.parse(inpsrc)

# this is the parser list used by the make_parser function if no
# alternatives are given as parameters to the function
xmlrpclib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def gzip_encode(data):
    """data -> gzip encoded data

    Encode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO()
    gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1)
    gzf.write(data)
    gzf.close()
    encoded = f.getvalue()
    f.close()
    return encoded

##
# Decode a string using the gzip content encoding such as specified by the
# Content-Encoding: gzip
# in the HTTP header, as described in RFC 1952
#
# @param data The encoded data
# @return the unencoded data
# @raises ValueError if data is not correctly coded.
xmlrpclib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def gzip_decode(data):
    """gzip encoded data -> unencoded data

    Decode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO(data)
    gzf = gzip.GzipFile(mode="rb", fileobj=f)
    try:
        decoded = gzf.read()
    except IOError:
        raise ValueError("invalid data")
    f.close()
    gzf.close()
    return decoded

##
# Return a decoded file-like object for the gzip encoding
# as described in RFC 1952.
#
# @param response A stream supporting a read() method
# @return a file-like object that the decoded data can be read() from
tempfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def rollover(self):
        if self._rolled: return
        file = self._file
        newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
        del self._TemporaryFileArgs

        newfile.write(file.getvalue())
        newfile.seek(file.tell(), 0)

        self._rolled = True

    # The method caching trick from NamedTemporaryFile
    # won't work here, because _file may change from a
    # _StringIO instance to a real file. So we list
    # all the methods directly.

    # Context management protocol
mhlib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getbodytext(self, decode = 1):
        """Return the message's body text as string.  This undoes a
        Content-Transfer-Encoding, but does not interpret other MIME
        features (e.g. multipart messages).  To suppress decoding,
        pass 0 as an argument."""
        self.fp.seek(self.startofbody)
        encoding = self.getencoding()
        if not decode or encoding in ('', '7bit', '8bit', 'binary'):
            return self.fp.read()
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        output = StringIO()
        mimetools.decode(self.fp, output, encoding)
        return output.getvalue()
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
        return original_headers.getvalue() + \
               self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
client.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_result(response, limit=None):
    if limit == '0':
        result = response.read(224 * 1024)
    elif limit:
        result = response.read(int(limit) * 1024)
    else:
        result = response.read(5242880)

    try:
        encoding = response.info().getheader('Content-Encoding')
    except:
        encoding = None
    if encoding == 'gzip':
        result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read()

    return result
socket.py 文件源码 项目:PyJFuzz 作者: mseclab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, sock, mode='rb', bufsize=-1, close=False):
        self._sock = sock
        self.mode = mode # Not actually used in this version
        if bufsize < 0:
            bufsize = self.default_bufsize
        self.bufsize = bufsize
        self.softspace = False
        # _rbufsize is the suggested recv buffer size.  It is *strictly*
        # obeyed within readline() for recv calls.  If it is larger than
        # default_bufsize it will be used for recv calls within read().
        if bufsize == 0:
            self._rbufsize = 1
        elif bufsize == 1:
            self._rbufsize = self.default_bufsize
        else:
            self._rbufsize = bufsize
        self._wbufsize = bufsize
        # We use StringIO for the read buffer to avoid holding a list
        # of variously sized string objects which have been known to
        # fragment the heap due to how they are malloc()ed and often
        # realloc()ed down much smaller than their original allocation.
        self._rbuf = StringIO()
        self._wbuf = [] # A list of strings
        self._wbuf_len = 0
        self._close = close
convert.py 文件源码 项目:jx-sqlite 作者: mozilla 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ini2value(ini_content):
    """
    INI FILE CONTENT TO Data
    """
    from ConfigParser import ConfigParser

    buff = StringIO.StringIO(ini_content)
    config = ConfigParser()
    config._read(buff, "dummy")

    output = {}
    for section in config.sections():
        output[section]=s = {}
        for k, v in config.items(section):
            s[k]=v
    return wrap(output)
strings_test.py 文件源码 项目:gransk 作者: pcbje 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_base(self):
    pipe = test_helper.get_mock_pipeline([helper.RUN_PIPELINE])
    _strings = strings.Subscriber(pipe)
    _strings.setup({
        'min_string_length': 4,
        'max_lines': 2
    })

    doc = document.get_document('mock')
    doc.set_size(12345)

    _strings.consume(doc, StringIO('AAAA\x00BBBB\x00CCCC'))

    # Two child documents produced.
    self.assertEquals(2, len(pipe.consumer.produced))

    expected = 'mock.00000.child'
    actual = pipe.consumer.produced[0][0].path

    self.assertEquals(expected, actual)


问题


面经


文章

微信
公众号

扫码关注公众号