python类GzipFile()的实例源码

__init__.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content, -zlib.MAX_WBITS)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
input_data.py 文件源码 项目:rbm-ae-tf 作者: Cospel 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def extract_images(filename):
  """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
  print('Extracting', filename)
  with tf.gfile.Open(filename, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream:
    magic = _read32(bytestream)
    if magic != 2051:
      raise ValueError(
          'Invalid magic number %d in MNIST image file: %s' %
          (magic, filename))
    num_images = _read32(bytestream)
    rows = _read32(bytestream)
    cols = _read32(bytestream)
    buf = bytestream.read(rows * cols * num_images)
    data = numpy.frombuffer(buf, dtype=numpy.uint8)
    data = data.reshape(num_images, rows, cols, 1)
    return data
check.py 文件源码 项目:wikilinks 作者: trovdimi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_title2id(self, dump_date):
        print('get_title2id...')
        title2id = {}
        regex = re.compile(r"\((\d+),0,'(.+?)','")
        fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-page.sql.gz'
        fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-page.sql'
        #with gzip.GzipFile(fname, 'rb') as infile:
        with open(fname) as f:
            content = f.readlines()
            for line in content:
                line = line.decode('utf-8')
                if not line.startswith('INSERT'):
                    continue
                for pid, title in regex.findall(line):
                    title2id[DataHandler.unescape_mysql(title)] = int(pid)

        return title2id
check.py 文件源码 项目:wikilinks 作者: trovdimi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_rpid2pid(self, dump_date):
        print('get_rpid2pid...')
        title2id = self.get_title2id(dump_date)
        rpid2pid = {}
        regex = re.compile(r"\((\d+),0,'(.+?)','")
        fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-redirect.sql.gz'
        with gzip.GzipFile(fname, 'rb') as infile:
            for line in infile:
                line = line.decode('utf-8')
                if not line.startswith('INSERT'):
                    continue
                line = line.replace('NULL', "''")
                for pid, title in regex.findall(line):
                    try:
                        rpid2pid[pid] = title2id[DataHandler.unescape_mysql(title)]
                    except KeyError:
                        print(pid, title)
                        # pdb.set_trace()
        return rpid2pid
lett.py 文件源码 项目:wmt16-document-alignment-task 作者: christianbuck 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read_lett_iter(f, decode=True):
    fh = f
    fh.seek(0)
    if f.name.endswith('.gz'):
        fh = gzip.GzipFile(fileobj=fh, mode='r')
    for line in fh:
        lang, mime, enc, url, html, text = line[:-1].split("\t")

        html = base64.b64decode(html)
        text = base64.b64decode(text)

        if decode:
            html = html.decode("utf-8")
            text = text.decode("utf-8")

        p = Page(url, html, text, mime, enc, lang)
        yield p
test_io.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_gzip_loadtxt():
    # Thanks to another windows brokeness, we can't use
    # NamedTemporaryFile: a file created from this function cannot be
    # reopened by another open call. So we first put the gzipped string
    # of the test reference array, write it to a securely opened file,
    # which is then read from by the loadtxt function
    s = BytesIO()
    g = gzip.GzipFile(fileobj=s, mode='w')
    g.write(b'1 2 3\n')
    g.close()

    s.seek(0)
    with temppath(suffix='.gz') as name:
        with open(name, 'wb') as f:
            f.write(s.read())
        res = np.loadtxt(name)
    s.close()

    assert_array_equal(res, [1, 2, 3])
__init__.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
mnist.py 文件源码 项目:taskcv-2017-public 作者: VisionLearningGroup 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _read_datafile(self, path, expected_dims):
        """Helper function to read a file in IDX format."""
        base_magic_num = 2048
        with gzip.GzipFile(path) as f:
            magic_num = struct.unpack('>I', f.read(4))[0]
            expected_magic_num = base_magic_num + expected_dims
            if magic_num != expected_magic_num:
                raise ValueError('Incorrect MNIST magic number (expected '
                                 '{}, got {})'
                                 .format(expected_magic_num, magic_num))
            dims = struct.unpack('>' + 'I' * expected_dims,
                                 f.read(4 * expected_dims))
            buf = f.read(reduce(operator.mul, dims))
            data = np.frombuffer(buf, dtype=np.uint8)
            data = data.reshape(*dims)
            return data
amset.py 文件源码 项目:amset 作者: hackingmaterials 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def to_file(self, dir_path='.', fname='amsetrun', force_write=True):
        if not force_write:
            n = 1
            fname0 = fname
            while os.path.exists(os.path.join(dir_path, '{}.json.gz'.format(fname))):
                warnings.warn('The file, {} exists. AMSET outputs will be '
                        'written in {}'.format(fname, fname0+'_'+str(n)))
                fname = fname0 + '_' + str(n)
                n += 1

        # make the output dict
        out_d = {'kgrid': self.kgrid, 'egrid': self.egrid}

        # write the output dict to file
        with gzip.GzipFile(
                os.path.join(dir_path, '{}.json.gz'.format(fname)), 'w') as fp:
            jsonstr = json.dumps(out_d, cls=MontyEncoder)
            fp.write(jsonstr)
__init__.py 文件源码 项目:office-interoperability-tools 作者: milossramek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
docs_resolv.py 文件源码 项目:sequana 作者: sequana 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
exportxml.py 文件源码 项目:gprime 作者: GenealogyCollective 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def write_handle(self, handle):
        """
        Write the database to the specified file handle.
        """

        if self.compress and _gzip_ok:
            try:
                g = gzip.GzipFile(mode="wb", fileobj=handle)
            except:
                g = handle
        else:
            g = handle

        self.g = codecs.getwriter("utf8")(g)

        self.write_xml_data()
        g.close()
        return 1
IonTorrent_summary_table.py 文件源码 项目:reportIT 作者: stevekm 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def py_gunzip(gz_file, outdir = "."):
    # extract a .gz file
    # read in the contents
    print gz_file
    print outdir
    print "Reading file:\t" + gz_file
    input_file = gzip.GzipFile(gz_file, 'rb')
    file_contents = input_file.read()
    input_file.close()
    print "Finished reading file"
    # get the output path from the outdir and filename
    output_file_base = os.path.basename(os.path.splitext(gz_file)[0])
    print output_file_base
    output_file_path = os.path.join(outdir, output_file_base)
    print output_file_path
    # write the contents
    print "Writing contents to file:\t" + output_file_path
    print type(output_file_path)
    # output_file = file(output_file_path, 'wb')
    output_file = open(output_file_path, 'wb')
    output_file.write(file_contents)
    output_file.close()
AnnotationLib.py 文件源码 项目:cancer 作者: yancz1989 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def saveIDL(filename, annotations):
    [name, ext] = os.path.splitext(filename)

    if(ext == ".idl"):
        file = open(filename,'w')

    if(ext == ".gz"):
        file = gzip.GzipFile(filename, 'w')

    if(ext == ".bz2"):
        file = bz2.BZ2File(filename, 'w')

    i=0
    for annotation in annotations:
        annotation.writeIDL(file)
        if (i+1<len(annotations)):
            file.write(";\n")
        else:
            file.write(".\n")
        i+=1

    file.close()
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def encode_private_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.private_pem())
            except TypeError:
                # It wasn't initialized yet
                return None
        return b64encode(fileobj.getvalue())
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def decode_private_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """

        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        private_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            private_key = f.read()

        if not private_key:
            return False

        # We were successful
        if not self.load(private_key=private_key):
            return False

        return True
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def decode_public_key(self, encoded):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it.  We decode it and load it.

        """
        fileobj = StringIO()
        try:
            fileobj.write(b64decode(encoded))
        except TypeError:
            return False

        fileobj.seek(0L, SEEK_SET)
        self.public_key = None
        with GzipFile(fileobj=fileobj, mode="rb") as f:
            try:
                self.public_key = serialization.load_pem_public_key(
                    f.read(),
                    backend=default_backend()
                )
            except ValueError:
                # Could not decrypt content
                return False

        if not self.public_key:
            return False

        return True
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def _init_read_gz(self):
        """Initialize for reading a gzip compressed fileobj.
        """
        self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS)
        self.dbuf = b""

        # taken from gzip.GzipFile with some alterations
        if self.__read(2) != b"\037\213":
            raise ReadError("not a gzip file")
        if self.__read(1) != b"\010":
            raise CompressionError("unsupported compression method")

        flag = ord(self.__read(1))
        self.__read(6)

        if flag & 4:
            xlen = ord(self.__read(1)) + 256 * ord(self.__read(1))
            self.read(xlen)
        if flag & 8:
            while True:
                s = self.__read(1)
                if not s or s == NUL:
                    break
        if flag & 16:
            while True:
                s = self.__read(1)
                if not s or s == NUL:
                    break
        if flag & 2:
            self.__read(2)
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def seekable(self):
        if not hasattr(self.fileobj, "seekable"):
            # XXX gzip.GzipFile and bz2.BZ2File
            return True
        return self.fileobj.seekable()
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open gzip compressed tar archive name for reading or writing.
           Appending is not allowed.
        """
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'")

        try:
            import gzip
            gzip.GzipFile
        except (ImportError, AttributeError):
            raise CompressionError("gzip module is not available")

        extfileobj = fileobj is not None
        try:
            fileobj = gzip.GzipFile(name, mode + "b", compresslevel, fileobj)
            t = cls.taropen(name, mode, fileobj, **kwargs)
        except IOError:
            if not extfileobj and fileobj is not None:
                fileobj.close()
            if fileobj is None:
                raise
            raise ReadError("not a gzip file")
        except:
            if not extfileobj and fileobj is not None:
                fileobj.close()
            raise
        t._extfileobj = extfileobj
        return t


问题


面经


文章

微信
公众号

扫码关注公众号