python类decompress()的实例源码

m.py 文件源码 项目:xuexi 作者: liwanlei 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def show():

        city=e1.get()
        url='http://wthrcdn.etouch.cn/WeatherApi?city='+urllib.parse.quote(city)
        weather= urllib.request.urlopen(url).read()
        weather_data = gzip.decompress(weather).decode('utf-8')
        try:
                soup=BeautifulSoup(weather_data)
                wheater=soup.find_all('weather')
                Text = (('??:%s'%soup.shidu.text),('??:%s'%soup.fengli.text),wheater[0].high.text,wheater[0].low.text,('??:%s'%wheater[0].type.text))
                e2['state']= 'normal'
                e2.delete(1.0,tk.END)
                e2.insert(tk.END,Text)
                e2['state']= 'disabled'
        except:
                Text ='??????????????????'
                e2['state']= 'normal'
                e2.delete(1.0,tk.END)
                e2.insert(tk.END,Text)
                e2['state']= 'disabled'
pdb.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_sifts_xml(pdb_id, outdir='', outfile=''):
    """Download the SIFTS file for a PDB ID.

    Args:
        pdb_id:
        outdir:
        outfile:

    Returns:

    """
    baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/'
    filename = '{}.xml.gz'.format(pdb_id)

    if outfile:
        outfile = op.join(outdir, outfile)
    else:
        outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml')

    if not op.exists(outfile):
        response = urlopen(baseURL + filename)
        with open(outfile, 'wb') as f:
            f.write(gzip.decompress(response.read()))

    return outfile
test_libbrotli.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_brotli_dynamic_cache(br_client):
    from brotli import decompress
    from time import sleep

    br_client.get(
            '/',
            headers=[('accept-encoding', 'gzip, br')])

    sleep(0.5)

    resp = br_client.get(
            '/',
            headers=[('accept-encoding', 'gzip, br')])

    assert resp.headers.get('x-brotli-cache') == 'HIT'
    assert resp.headers.get('content-encoding') == 'br'
    assert b"Hello, world!" in decompress(resp.data)
reptile.py 文件源码 项目:n-gram 作者: sunshineclt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def saveContentOfURL(target_url):
    # ?????????URL??????,???
    if target_url in searched_url:
        return
    try:
        # ??GET??target_url
        article_response = urllib.request.urlopen(target_url)
        raw_data = article_response.read()
        # ???????gzip????????
        if article_response.getheader("Content-Encoding") == "gzip":
            raw_data = gzip.decompress(raw_data)
        # gb2312??,???????????????,??????
        article_data = raw_data.decode('gb2312', 'ignore')
        # ?????<p></p>????clean????
        forEachMatch(pattern_str='<p>(.*?)</p>', to_match_str=article_data,
                     func=lambda match: file_operator.writeFile(cleanArticle(match.group(1))))
    except urllib.error.URLError:
        print(target_url, 'is a wrong url')
    except BaseException as message:
        print(message)
    # ?????????URL
    searched_url.add(target_url)


# ??<p></p>????,???????
narinfo.py 文件源码 项目:pynix 作者: adnelson 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def import_to_store(self, compressed_nar):
        """Given a compressed NAR, extract it and import it into the nix store.

        :param compressed_nar: The bytes of a NAR, compressed.
        :type  compressed_nar: ``str``
        """
        # Figure out how to extract the content.
        if self.compression.lower() in ("xz", "xzip"):
            data = lzma.decompress(compressed_nar)
        elif self.compression.lower() in ("bz2", "bzip2"):
            data = bz2.decompress(compressed_nar)
        else:
            data = gzip.decompress(compressed_nar)

        # Once extracted, convert it into a nix export object and import.
        export = self.nar_to_export(data)
        imported_path = export.import_to_store()
checker3.py 文件源码 项目:repo-checker 作者: 1dot75cm 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_page(self, _url):
        ''' ????????
        return str '''

        header = { 'Accept-Encoding': 'gzip' }
        header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)]
        if opts['user_agent']: header['User-Agent'] = opts['user_agent']

        with (yield from semaphore):
            response = yield from aiohttp.request('GET', _url, headers = header)
            page = yield from response.read()

        try:
            if self.url_type == "2": return "None Content"
            if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8')
            else:                    return gzip.decompress(page)
        except OSError:
            return page
deluge.py 文件源码 项目:Seedbox-Statistics-For-InfluxDB 作者: barrycarey 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _process_response(self, res):
        """
        Take the response object and return JSON
        :param res:
        :return:
        """
        # TODO Figure out exceptions here
        if res.headers['Content-Encoding'] == 'gzip':
            self.send_log('Detected gzipped response', 'debug')
            raw_output = gzip.decompress(res.read()).decode('utf-8')
        else:
            self.send_log('Detected other type of response encoding: {}'.format(res.headers['Content-Encoding']), 'debug')
            raw_output = res.read().decode('utf-8')

        json_output = json.loads(raw_output)

        return json_output
blueprint.py 文件源码 项目:tm-manifesting 作者: FabricAttachedMemory 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _load_data():
    # https://github.com/raumkraut/python-debian/blob/master/README.deb822

    global _data
    mirror = BP.config['L4TM_MIRROR']
    release = BP.config['L4TM_RELEASE']
    repo = '%s/dists/%s/%%s/%%s/Packages.gz' % (mirror, release)

    _data = {}
    for area in BP.config['L4TM_AREAS']:
        for arch in ('binary-all', 'binary-arm64'):
            BP.logger.info('Loading/processing %s/%s/Packages.gz...' % (
                area, arch))
            pkgarea = repo % (area, arch)
            pkgresp = HTTP_REQUESTS.get(pkgarea)
            if pkgresp.status_code != 200:
                BP.logger.error('%s not found' % arch)
                continue
            BP.logger.debug('Uncompressing %s bytes' % pkgresp.headers['content-length'])

            unzipped = gzip.decompress(pkgresp.content) # bytes all around
            BP.logger.debug('Parsing %d bytes of package data' % len(unzipped))
            unzipped = BytesIO(unzipped)    # the next step needs read()
            tmp = [ src for src in Packages.iter_paragraphs(unzipped) ]
            _data.update(dict((pkg['Package'], pkg) for pkg in tmp))
train.py 文件源码 项目:hh-page-classifier 作者: TeamHG-Memex 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def extract_features(doc):
    html = doc['html'] or ''
    if not doc_is_extra_sampled(doc):
        try:
            html = gzip.decompress(base64.b64decode(html)).decode('utf8')
        except Exception:
            pass  # support not compressed html too
    text = html_text.extract_text(html)
    try:
        lang = langdetect.detect(text)
    except LangDetectException:
        lang = None
    return {
        'text': text,
        'language': lang,
    }
net.py 文件源码 项目:defuse_division 作者: lelandbatey 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def msg_recv(conn, sendfunc, closefunc):
    '''
    Function msg_recv reads null-delimited series of bytes from `conn`, which
    is a socket. Each series of bytes is then de-serialized into a json object,
    and `sendfunc` is called with that json object.
    `closefunc` is called if/when the socket `conn` is closed.
    '''
    buf = bytes()
    while True:
        try:
            data = conn.recv(8192)

            # No data means the connection is closed
            if not data:
                closefunc()
                return

            inbuf = buf + data
            if SEP in inbuf:
                parts = inbuf.split(SEP)
                # logging.debug("Length of parts: {}".format(len(parts)))
                tosend = [parts[0]]
                for p in parts[1:-1]:
                    tosend.append(p)
                buf = parts[-1]
                for msg in tosend:
                    m = gzip.decompress(msg)
                    m = m.decode('utf-8')
                    logging.debug("Msg: {}".format(m[:150]+'...' if len(m) > 150 else m))
                    obj = json.loads(m)
                    sendfunc(obj)
            else:
                buf += data
        except Exception as e:
            logging.exception(e)
mtproto_sender.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
        self._logger.debug('Handling gzip packed data')
        reader.read_int(signed=False)  # code
        packed_data = reader.tgread_bytes()
        unpacked_data = gzip.decompress(packed_data)

        with BinaryReader(unpacked_data) as compressed_reader:
            return self._process_msg(
                msg_id, sequence, compressed_reader, updates)

    # endregion
mail.py 文件源码 项目:PyCIRCLeanMail 作者: CIRCL 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _lzma(self):
        '''LZMA processor'''
        try:
            archive = lzma.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
mail.py 文件源码 项目:PyCIRCLeanMail 作者: CIRCL 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _bzip(self):
        '''BZip2 processor'''
        try:
            archive = bz2.decompress(self.cur_attachment.file_obj.read())
            new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
            cur_file = File(archive, new_fn)
            self.process_payload(cur_file)
        except:
            self.cur_attachment.make_dangerous()
        return self.cur_attachment
pdb.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def download_biological_assemblies(pdb_id, outdir):
    """Downloads biological assembly file from:
    `ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/`

    Args:
        outdir (str): Output directory of the decompressed assembly

    """

    # TODO: not tested yet
    if not op.exists(outdir):
        raise ValueError('{}: output directory does not exist'.format(outdir))

    folder = pdb_id[1:3]
    server = 'ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/{}/'.format(folder)
    html_folder = urlopen(server).readlines()
    for line in html_folder:
        if pdb_id in str(line).strip():
            file_name = '%s' % (pdb_id + str(line).strip().split(pdb_id)[1].split('\r\n')[0])
            outfile_name = file_name.replace('.', '_')
            outfile_name = outfile_name.replace('_gz', '.pdb')
            f = urlopen(op.join(server, file_name))
            decompressed_data = zlib.decompress(f.read(), 16 + zlib.MAX_WBITS)
            with open(op.join(outdir, outfile_name), 'wb') as f:
                f.write(decompressed_data)
                f.close()
            log.debug('{}: downloaded biological assembly')
            return op.join(outdir, outfile_name)
helpers.py 文件源码 项目:postmarker 作者: Stranger6667 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def decode(value, decompress=False):
    """
    Decodes response from Base64 encoded string.
    """
    decoded = base64.b64decode(value)
    if decompress:
        decoded = gzip.decompress(decoded)
    return json.loads(decoded.decode())
gzip_packed.py 文件源码 项目:Telethon 作者: LonamiWebs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def read(reader):
        assert reader.read_int(signed=False) == GzipPacked.CONSTRUCTOR_ID
        return gzip.decompress(reader.tgread_bytes())
vxstream.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_vxstream_report(sha256, envid, type_):
    # XML, HTML, BIN and PCAP are GZipped
    Sample.query.filter_by(sha256=sha256).first_or_404()
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'VxStream Sandbox API Client'}
    params = {'type': type_, 'environmentId': envid}
    vx = vxstream.api.get('result/{}'.format(sha256),
                          params=params, headers=headers)
    if type_ in ['xml', 'html', 'bin', 'pcap']:
        return gzip.decompress(vx)
    return vx
fireeye.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_fireeye_report(sha256, envid, type):
    raise ApiException({}, 501)
    # XML, HTML, BIN and PCAP are GZipped
    Sample.query.filter_by(sha256=sha256).first_or_404()
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'FireEye Sandbox API Client'}
    params = {'type': type, 'environmentId': envid}
    vx = fireeye.api.get('result/{}'.format(sha256),
                         params=params, headers=headers)
    if type in ['xml', 'html', 'bin', 'pcap']:
        return gzip.decompress(vx)
    return vx
vxstream.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_cp_vxstream_report(sha256, envid, type_):
    # XML, HTML, BIN and PCAP are GZipped
    Sample.query.filter_by(sha256=sha256, user_id=g.user.id).first_or_404()
    headers = {
        'Accept': 'text/html',
        'User-Agent': 'VxStream Sandbox API Client'}
    params = {'type': type_, 'environmentId': envid}
    vx = vxstream.api.get('result/{}'.format(sha256),
                          params=params, headers=headers)
    if type_ in ['xml', 'html', 'bin', 'pcap']:
        return gzip.decompress(vx)
    return vx
crawler.py 文件源码 项目:google-crawler 作者: xibowang 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def fetch_page(query):
    url = REQUEST_URL.format(BASE_URL, query)
    request = urllib.request.Request(url)
    request.add_header('User-agent', _random_user_agent())
    request.add_header('connection', 'keep-alive')
    request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br')
    request.add_header('referer', REQUEST_URL.format(BASE_URL, ""))
    print(url)
    response = urllib.request.urlopen(request)

    data = response.read()
    print(type(data))
    return gzip.decompress(data)
parser.py 文件源码 项目:google-crawler 作者: xibowang 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read_bytes_data():
    with open('sample.bytes', 'rb') as file:
        content = file.read()
        # dom = BeautifulSoup(gzip.decompress(content))
        data = gzip.decompress(content)
        # soup = BeautifulSoup(data, 'html.parser')
        # links = soup.find_all('a', {'class':'l _HId'})
        # for link in links:
        #     print(link.get('href'))
        # print(soup.prettify())
utils.py 文件源码 项目:mugen 作者: PeterDing 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def decode_gzip(content):
    assert isinstance(content, bytes)
    return gzip.decompress(content)
utils.py 文件源码 项目:mugen 作者: PeterDing 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def decode_deflate(content):
    assert isinstance(content, bytes)
    try:
        return zlib.decompress(content)
    except Exception:
        return zlib.decompress(content, -zlib.MAX_WBITS)
test_libbrotli.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_brotli_static_gzip(br_client):
    from gzip import decompress

    gzip_resp = br_client.get(
            '/static/bee.txt',
            headers=[('accept-encoding', 'gzip')])
    assert gzip_resp.headers.get('content-encoding') == 'gzip'

    assert BEE_SCRIPT in decompress(gzip_resp.data)
test_libbrotli.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_brotli_static_br(br_client):
    from brotli import decompress

    br_resp = br_client.get(
            '/static/bee.txt',
            headers=[('accept-encoding', 'gzip, br')])
    assert br_resp.headers.get('content-encoding') == 'br'

    assert BEE_SCRIPT in decompress(br_resp.data)
test_libbrotli.py 文件源码 项目:forget 作者: codl 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_brotli_dynamic(br_client):
    from brotli import decompress

    resp = br_client.get(
            '/',
            headers=[('accept-encoding', 'gzip, br')])

    assert resp.headers.get('x-brotli-cache') == 'MISS'
    assert resp.headers.get('content-encoding') == 'br'
    assert b"Hello, world!" in decompress(resp.data)
test_frontend_s3.py 文件源码 项目:miracle 作者: mozilla 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_main(bucket, crypto, processor):
    _p = functools.partial(_payload, crypto)
    today = date.today()
    user = 'foo'
    prefix = 'v2/sessions/%s/%s/%s/' % (today.year, today.month, user)

    records = [
        make_record('2', _p({'user': user, 'extra': 1})),
    ]
    assert main(processor, records) == ('2', 0)

    objs = list(bucket.filter(Prefix=prefix))
    assert len(objs) == 1
    assert objs[0].key.endswith('.json.gz')

    obj = bucket.get(objs[0].key)
    assert obj['ContentEncoding'] == 'gzip'
    assert obj['ContentType'] == 'application/json'
    body = obj['Body'].read()
    obj['Body'].close()

    body = json.loads(gzip.decompress(body).decode('utf-8'))
    assert body == {'user': user, 'extra': 1}

    # Upload a second time
    records = [
        make_record('3', _p({'user': user, 'extra': 2})),
    ]
    assert main(processor, records) == ('3', 0)

    objs = list(bucket.filter(Prefix=prefix))
    assert len(objs) == 2
decision_builder.py 文件源码 项目:floto 作者: babbel 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _decompress_result(self, compressed_result):
        result_bytes = bytes([int(c, 16) for c in compressed_result.split('x')])
        result = gzip.decompress(result_bytes).decode()
        result = json.loads(result)
        return result
gzip.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decompress(
        data,
    ):
        decompressed_object = gzip.decompress(data)

        return decompressed_object
cloudtrail_to_es.py 文件源码 项目:analytics-platform-ops 作者: ministryofjustice 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def log_file_s3_object(record):
    return gzip.decompress(s3.get_object(
        Bucket=record['s3']['bucket']['name'],
        Key=record['s3']['object']['key'])['Body'].read())


问题


面经


文章

微信
公众号

扫码关注公众号