python类decompress()的实例源码

net.py 文件源码 项目:bcloud 作者: wangYanJava 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def urlopen(url, headers={}, data=None, retries=RETRIES, timeout=TIMEOUT):
    '''????http??, ???Request.

    headers ???dict. ?????????, ??User-Agent, Referer?, ?
    ????????.

    ????????http??, ??????????.
    ???????gzip????, ????gzip???????, ???????
    ??.
    req.data ?????????http????, ????UTF-8?????.
    '''
    headers_merged = default_headers.copy()
    for key in headers.keys():
        headers_merged[key] = headers[key]
    opener = urllib.request.build_opener(ForbiddenHandler)
    opener.addheaders = [(k, v) for k,v in headers_merged.items()]

    for i in range(retries):
        try:
            req = opener.open(url, data=data, timeout=timeout)
            encoding = req.headers.get('Content-encoding')
            req.data = req.read()
            if encoding == 'gzip':
                req.data = gzip.decompress(req.data)
            elif encoding == 'deflate':
                req.data = zlib.decompress(req.data, -zlib.MAX_WBITS)
            return req
        except OSError:
            logger.error(traceback.format_exc())

        except:
            logger.error(traceback.format_exc())

    return None
net.py 文件源码 项目:bcloud 作者: wangYanJava 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post_multipart(url, headers, fields, files, retries=RETRIES):
    content_type, body = encode_multipart_formdata(fields, files)
    schema = urllib.parse.urlparse(url)

    headers_merged = default_headers.copy()
    for key in headers.keys():
        headers_merged[key] = headers[key]
    headers_merged['Content-Type'] = content_type
    headers_merged['Content-length'] = str(len(body))

    for i in range(retries):
        try:
            h = http.client.HTTPConnection(schema.netloc)
            h.request('POST', url, body=body, headers=headers_merged)
            req = h.getresponse()
            encoding = req.getheader('Content-encoding')
            req.data = req.read()
            if encoding == 'gzip':
                req.data = gzip.decompress(req.data)
            elif encoding == 'deflate':
                req.data = zlib.decompress(req.data, -zlib.MAX_WBITS)
            return req
        except OSError:
            logger.error(traceback.format_exc())
        except:
            logger.error(traceback.format_exc())
            #return None
    return None
release.py 文件源码 项目:amprolla 作者: parazyd 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def rehash_release(_filelist, fdesc, rmstr):
    """
    Calculates checksums of a given filelist and writes them to the given
    file descriptor. Takes rmstr as the third argument, which is a string to
    remove from the path of the hashed file when writing it to a file.
    """
    info('Hashing checksums')
    for csum in checksums:
        fdesc.write('%s:\n' % csum['name'])
        for i in _filelist:
            if isfile(i):
                cont = open(i, 'rb').read()
                fdesc.write(' %s %8s %s\n' % (csum['f'](cont).hexdigest(),
                                              getsize(i),
                                              i.replace(rmstr+'/', '')))
            elif i.endswith('.xz') and isfile(i.replace('.xz', '.gz')):
                xzstr = lzma_comp(open(i.replace('.xz', '.gz'), 'rb').read())
                fdesc.write(' %s %8s %s\n' % (csum['f'](xzstr).hexdigest(),
                                              len(xzstr),
                                              i.replace(rmstr+'/', '')))
            elif not i.endswith('.gz') and isfile(i+'.gz'):
                uncomp = gzip_decomp(open(i+'.gz', 'rb').read())
                fdesc.write(' %s %8s %s\n' % (csum['f'](uncomp).hexdigest(),
                                              len(uncomp),
                                              i.replace(rmstr+'/', '')))
    return
spider_zh2.py 文件源码 项目:PythonBasicDemo 作者: actanble 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def ungzip(data):
    try:        # ????
        print('????.....')
        data = gzip.decompress(data)
        print('????!')
    except:
        print('????, ????')
    return data
spider1.py 文件源码 项目:PythonBasicDemo 作者: actanble 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ungzip(data):
    try:        # ????
        print('????.....')
        data = gzip.decompress(data)
        print('????!')
    except:
        print('????, ????')
    return data
LixinStaffInfoSpider.py 文件源码 项目:office_helper 作者: mikumikulch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __ungzip(self, data):
        try:  # ????
            # print('????.....')
            data = gzip.decompress(data)
            # print('????!')
        except:
            logger.error('?????????????????')
        # print('????, ????')
        return data
??.py 文件源码 项目:xuexi 作者: liwanlei 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def Get_weather(city):
    url = 'http://wthrcdn.etouch.cn/WeatherApi?city=' + urllib.parse.quote(city)
    weather = urllib.request.urlopen(url).read()
    weather_data = gzip.decompress(weather).decode('utf-8')
    soup = BeautifulSoup(weather_data)
    return soup
??.py 文件源码 项目:xuexi 作者: liwanlei 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_huochepiao(url1):
    headers={'Accept':'text/javascript, application/javascript, application/ecmascript, application/x-ecmascript, */*; q=0.01',
    'Accept-Encoding':'gzip, deflate, sdch',
    'Accept-Language':'zh-CN,zh;q=0.8',
    'Connection':'keep-alive',
    'Cookie':'QN99=3733; QN1=eIQiP1dEAASbOq51LyeNAg==; QunarGlobal=192.168.31.105_-4c70ffc_154e15d8dc8_2de|1464074245311; ag_fid=AsWRT9vZYYLJ23qF; __ag_cm_=1464074247005; QN269=906FD473217F11E6B393C4346BAC1530; PHPSESSID=epq85mhbfeg12b3t6q8rkic702; QN25=5cfd26dc-8670-44ec-aafc-94923235a6fc-9f992f90; QN42=zbua0851; _q=U.ryzxozi0081; _t=24542281; csrfToken=QxdjaQNPcDnkhaMMMwxbGbpwWeKXNtET; _s=s_2QHWQF6G6AI3QWPVO6UBTX2LZE; _v=-8JqPkXGW-Vsgcr1koBOn0mWlXDIk6gdgRyueLvJJO3C0Ru2ALnLJw7DFu6Y6FUrAWf8tU-PZtj1Dc2l_o50sSp6YyMnlDQ4dVpPmDi0QMz_XOGK0loLwpTeCoe0wvE0aHJKPGHtArx4jlrdtgWSX9O2IfI8qnNi3-wHXEY6rVEN; QN44=ryzxozi0081; _i=RBTjeomvkDExEx-xsOrmQxSvMXex; _vi=7AZYnlCS385W7Z8-IQdjp5sbVR1PFm8kL0-Qi39HR1-wvJEvexvDP9L5vcTyfiBM9AUeWbCi1osGa2UEs6aMSu-IrejFGqde7L7Y04s8z115RVvdF0h-VmYrWg5Ni-nNZVw8xz3rFA7Jcv-ASn9aff2fhGbtS_0JFDKWQkwggWMx; Hm_lvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472535537; Hm_lpvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472541016; QN268=|1472541016285_e1523dd1fcbd8c01',
    'Host':'train.qunar.com',
    'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.80 Safari/537.36',
    'X-Requested-With':'XMLHttpRequest'
    }
    req=urllib.request.Request(url1,headers=headers)
    html=urllib.request.urlopen(req).read()
    decompressed_data = zlib.decompress(html ,16+zlib.MAX_WBITS)
    text = decompressed_data.decode("utf-8")
    soup=BeautifulSoup(text)
    m=str(soup)[46:-2]
    htm=json.loads(m)
    try:
        lines=htm['data']['s2sBeanList']
        print('??????%s??'%len(lines))
        i =1
        for item in lines:
            print("-------------?%s?????-------------"%i)
            print('--------------??????--------------')
            print('?????%s'%item['dptStationName']),\
            print('?????%s'%item['arrStationName']),\
            print('??:%s'%item['trainNo']),\
            print('????:%s'%item['dptTime']),\
            print('??%s'%item['arrTime']),\
            print('???%s'%item['extraBeanMap']['interval']),\
            print('?????%s'%item['extraBeanMap']['ticketType']),print('?????%s'%item['extraBeanMap']['stationType'])
            b=item['seats']
            for key ,value in b.items():
                    print('?????%s,???%s,??:%s'%(key,value['price'],value['count']))
            i+=1
    except:
        print('????????????????????????')
#--------??????
test_middleware.py 文件源码 项目:django-brotli 作者: illagrenan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_middleware_compress_response(self):
        fake_request = FakeRequestAcceptsBrotli()
        response_content = UTF8_LOREM_IPSUM_IN_CZECH
        fake_response = FakeResponse(content=response_content)

        brotli_middleware = BrotliMiddleware()
        brotli_response = brotli_middleware.process_response(fake_request, fake_response)

        decompressed_response = brotli.decompress(data=brotli_response.content)  # type: bytes
        self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))
test_middleware.py 文件源码 项目:django-brotli 作者: illagrenan 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_etag_is_updated_if_present(self):
        fake_request = FakeRequestAcceptsBrotli()
        response_content = UTF8_LOREM_IPSUM_IN_CZECH * 5
        fake_etag_content = "\"foo\""
        fake_response = FakeResponse(content=response_content, headers={"ETag": fake_etag_content})

        self.assertEqual(fake_response['ETag'], fake_etag_content)

        brotli_middleware = BrotliMiddleware()
        brotli_response = brotli_middleware.process_response(fake_request, fake_response)

        decompressed_response = brotli.decompress(data=brotli_response.content)  # type: bytes
        self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))

        self.assertEqual(brotli_response['ETag'], '"foo;br\\"')
test_middleware.py 文件源码 项目:django-brotli 作者: illagrenan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_middleware_wont_compress_if_response_is_already_compressed(self):
        fake_request = FakeRequestAcceptsBrotli()
        response_content = UTF8_LOREM_IPSUM_IN_CZECH
        fake_response = FakeResponse(content=response_content)

        brotli_middleware = BrotliMiddleware()
        django_gzip_middleware = GZipMiddleware()

        gzip_response = django_gzip_middleware.process_response(fake_request, fake_response)
        brotli_response = brotli_middleware.process_response(fake_request, gzip_response)

        self.assertEqual(response_content, gzip.decompress(brotli_response.content).decode(encoding='utf-8'))
stackoverflow.py 文件源码 项目:GLaDOS2 作者: TheComet 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_json_response(url):
    with urllib.request.urlopen(url) as response:
        result = gzip.decompress(response.read())
        return json.loads(result.decode('utf-8'))
zachcrwallib.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def ungzip(data,url):
    try:
        print(url,"?????...")
        data = gzip.decompress(data)
        print(url,"????...")
    except:
        print(url,"?????????...")
    return data
webspider-movieinfo-download.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ungzip(data,url):
    try:
        #print(url,"?????...")
        data = gzip.decompress(data)
        #print(url,"????...")
    except:
        #print(url,"?????????...")
        pass
    return data
webspider-byg-getlist.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ungzip(data,url):
    try:
        print(url,"?????...")
        data = gzip.decompress(data)
        print(url,"????...")
    except:
        print(url,"?????????...")
    return data
webspider-cnblog-content.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ungzip(data,url):
    try:
        data = gzip.decompress(data)
    except:
        pass
    return data
webspider-csdnblog-getlist.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def ungzip(data):
    try:
        #print("?????...")
        data = gzip.decompress(data)
        #print("????...")
    except:
        print("?????????...")
    return data

#CSDN???
test_dcos_diagnostics.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def verify_unit_response(zip_ext_file, min_lines):
    assert isinstance(zip_ext_file, zipfile.ZipExtFile)
    unit_output = gzip.decompress(zip_ext_file.read())
    assert len(unit_output.decode().split('\n')) >= min_lines, 'Expect at least {} lines. Full unit output {}'.format(
        min_lines, unit_output)
iddp.py 文件源码 项目:IoT-Client 作者: suquark 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resolve_msg(msg_bytes):
    try:
        msg = simplejson.loads(gzip.decompress(crypto.decrypt(msg_bytes)))
        return msg if msg['proto'] == 'iddp' else None
    except Exception as e:
        logging.error("Received an invalid message: %s" % str(e))
        return None


# iot_force_download_msg = {'proto': 'iddp', 'role': 'force_download', 'routine': ':19001/your_file'}
spider.py 文件源码 项目:etlpy 作者: ferventdesert 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ungzip(data):
    data = gzip.decompress(data)
    return data;


问题


面经


文章

微信
公众号

扫码关注公众号