def urlopen(url, headers={}, data=None, retries=RETRIES, timeout=TIMEOUT):
'''????http??, ???Request.
headers ???dict. ?????????, ??User-Agent, Referer?, ?
????????.
????????http??, ??????????.
???????gzip????, ????gzip???????, ???????
??.
req.data ?????????http????, ????UTF-8?????.
'''
headers_merged = default_headers.copy()
for key in headers.keys():
headers_merged[key] = headers[key]
opener = urllib.request.build_opener(ForbiddenHandler)
opener.addheaders = [(k, v) for k,v in headers_merged.items()]
for i in range(retries):
try:
req = opener.open(url, data=data, timeout=timeout)
encoding = req.headers.get('Content-encoding')
req.data = req.read()
if encoding == 'gzip':
req.data = gzip.decompress(req.data)
elif encoding == 'deflate':
req.data = zlib.decompress(req.data, -zlib.MAX_WBITS)
return req
except OSError:
logger.error(traceback.format_exc())
except:
logger.error(traceback.format_exc())
return None
python类decompress()的实例源码
def post_multipart(url, headers, fields, files, retries=RETRIES):
content_type, body = encode_multipart_formdata(fields, files)
schema = urllib.parse.urlparse(url)
headers_merged = default_headers.copy()
for key in headers.keys():
headers_merged[key] = headers[key]
headers_merged['Content-Type'] = content_type
headers_merged['Content-length'] = str(len(body))
for i in range(retries):
try:
h = http.client.HTTPConnection(schema.netloc)
h.request('POST', url, body=body, headers=headers_merged)
req = h.getresponse()
encoding = req.getheader('Content-encoding')
req.data = req.read()
if encoding == 'gzip':
req.data = gzip.decompress(req.data)
elif encoding == 'deflate':
req.data = zlib.decompress(req.data, -zlib.MAX_WBITS)
return req
except OSError:
logger.error(traceback.format_exc())
except:
logger.error(traceback.format_exc())
#return None
return None
def rehash_release(_filelist, fdesc, rmstr):
"""
Calculates checksums of a given filelist and writes them to the given
file descriptor. Takes rmstr as the third argument, which is a string to
remove from the path of the hashed file when writing it to a file.
"""
info('Hashing checksums')
for csum in checksums:
fdesc.write('%s:\n' % csum['name'])
for i in _filelist:
if isfile(i):
cont = open(i, 'rb').read()
fdesc.write(' %s %8s %s\n' % (csum['f'](cont).hexdigest(),
getsize(i),
i.replace(rmstr+'/', '')))
elif i.endswith('.xz') and isfile(i.replace('.xz', '.gz')):
xzstr = lzma_comp(open(i.replace('.xz', '.gz'), 'rb').read())
fdesc.write(' %s %8s %s\n' % (csum['f'](xzstr).hexdigest(),
len(xzstr),
i.replace(rmstr+'/', '')))
elif not i.endswith('.gz') and isfile(i+'.gz'):
uncomp = gzip_decomp(open(i+'.gz', 'rb').read())
fdesc.write(' %s %8s %s\n' % (csum['f'](uncomp).hexdigest(),
len(uncomp),
i.replace(rmstr+'/', '')))
return
def ungzip(data):
try: # ????
print('????.....')
data = gzip.decompress(data)
print('????!')
except:
print('????, ????')
return data
def ungzip(data):
try: # ????
print('????.....')
data = gzip.decompress(data)
print('????!')
except:
print('????, ????')
return data
def __ungzip(self, data):
try: # ????
# print('????.....')
data = gzip.decompress(data)
# print('????!')
except:
logger.error('?????????????????')
# print('????, ????')
return data
def Get_weather(city):
url = 'http://wthrcdn.etouch.cn/WeatherApi?city=' + urllib.parse.quote(city)
weather = urllib.request.urlopen(url).read()
weather_data = gzip.decompress(weather).decode('utf-8')
soup = BeautifulSoup(weather_data)
return soup
def get_huochepiao(url1):
headers={'Accept':'text/javascript, application/javascript, application/ecmascript, application/x-ecmascript, */*; q=0.01',
'Accept-Encoding':'gzip, deflate, sdch',
'Accept-Language':'zh-CN,zh;q=0.8',
'Connection':'keep-alive',
'Cookie':'QN99=3733; QN1=eIQiP1dEAASbOq51LyeNAg==; QunarGlobal=192.168.31.105_-4c70ffc_154e15d8dc8_2de|1464074245311; ag_fid=AsWRT9vZYYLJ23qF; __ag_cm_=1464074247005; QN269=906FD473217F11E6B393C4346BAC1530; PHPSESSID=epq85mhbfeg12b3t6q8rkic702; QN25=5cfd26dc-8670-44ec-aafc-94923235a6fc-9f992f90; QN42=zbua0851; _q=U.ryzxozi0081; _t=24542281; csrfToken=QxdjaQNPcDnkhaMMMwxbGbpwWeKXNtET; _s=s_2QHWQF6G6AI3QWPVO6UBTX2LZE; _v=-8JqPkXGW-Vsgcr1koBOn0mWlXDIk6gdgRyueLvJJO3C0Ru2ALnLJw7DFu6Y6FUrAWf8tU-PZtj1Dc2l_o50sSp6YyMnlDQ4dVpPmDi0QMz_XOGK0loLwpTeCoe0wvE0aHJKPGHtArx4jlrdtgWSX9O2IfI8qnNi3-wHXEY6rVEN; QN44=ryzxozi0081; _i=RBTjeomvkDExEx-xsOrmQxSvMXex; _vi=7AZYnlCS385W7Z8-IQdjp5sbVR1PFm8kL0-Qi39HR1-wvJEvexvDP9L5vcTyfiBM9AUeWbCi1osGa2UEs6aMSu-IrejFGqde7L7Y04s8z115RVvdF0h-VmYrWg5Ni-nNZVw8xz3rFA7Jcv-ASn9aff2fhGbtS_0JFDKWQkwggWMx; Hm_lvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472535537; Hm_lpvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472541016; QN268=|1472541016285_e1523dd1fcbd8c01',
'Host':'train.qunar.com',
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.80 Safari/537.36',
'X-Requested-With':'XMLHttpRequest'
}
req=urllib.request.Request(url1,headers=headers)
html=urllib.request.urlopen(req).read()
decompressed_data = zlib.decompress(html ,16+zlib.MAX_WBITS)
text = decompressed_data.decode("utf-8")
soup=BeautifulSoup(text)
m=str(soup)[46:-2]
htm=json.loads(m)
try:
lines=htm['data']['s2sBeanList']
print('??????%s??'%len(lines))
i =1
for item in lines:
print("-------------?%s?????-------------"%i)
print('--------------??????--------------')
print('?????%s'%item['dptStationName']),\
print('?????%s'%item['arrStationName']),\
print('??:%s'%item['trainNo']),\
print('????:%s'%item['dptTime']),\
print('??%s'%item['arrTime']),\
print('???%s'%item['extraBeanMap']['interval']),\
print('?????%s'%item['extraBeanMap']['ticketType']),print('?????%s'%item['extraBeanMap']['stationType'])
b=item['seats']
for key ,value in b.items():
print('?????%s,???%s,??:%s'%(key,value['price'],value['count']))
i+=1
except:
print('????????????????????????')
#--------??????
def test_middleware_compress_response(self):
fake_request = FakeRequestAcceptsBrotli()
response_content = UTF8_LOREM_IPSUM_IN_CZECH
fake_response = FakeResponse(content=response_content)
brotli_middleware = BrotliMiddleware()
brotli_response = brotli_middleware.process_response(fake_request, fake_response)
decompressed_response = brotli.decompress(data=brotli_response.content) # type: bytes
self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))
def test_etag_is_updated_if_present(self):
fake_request = FakeRequestAcceptsBrotli()
response_content = UTF8_LOREM_IPSUM_IN_CZECH * 5
fake_etag_content = "\"foo\""
fake_response = FakeResponse(content=response_content, headers={"ETag": fake_etag_content})
self.assertEqual(fake_response['ETag'], fake_etag_content)
brotli_middleware = BrotliMiddleware()
brotli_response = brotli_middleware.process_response(fake_request, fake_response)
decompressed_response = brotli.decompress(data=brotli_response.content) # type: bytes
self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))
self.assertEqual(brotli_response['ETag'], '"foo;br\\"')
def test_middleware_wont_compress_if_response_is_already_compressed(self):
fake_request = FakeRequestAcceptsBrotli()
response_content = UTF8_LOREM_IPSUM_IN_CZECH
fake_response = FakeResponse(content=response_content)
brotli_middleware = BrotliMiddleware()
django_gzip_middleware = GZipMiddleware()
gzip_response = django_gzip_middleware.process_response(fake_request, fake_response)
brotli_response = brotli_middleware.process_response(fake_request, gzip_response)
self.assertEqual(response_content, gzip.decompress(brotli_response.content).decode(encoding='utf-8'))
def get_json_response(url):
with urllib.request.urlopen(url) as response:
result = gzip.decompress(response.read())
return json.loads(result.decode('utf-8'))
def ungzip(data,url):
try:
print(url,"?????...")
data = gzip.decompress(data)
print(url,"????...")
except:
print(url,"?????????...")
return data
def ungzip(data,url):
try:
#print(url,"?????...")
data = gzip.decompress(data)
#print(url,"????...")
except:
#print(url,"?????????...")
pass
return data
def ungzip(data,url):
try:
print(url,"?????...")
data = gzip.decompress(data)
print(url,"????...")
except:
print(url,"?????????...")
return data
def ungzip(data,url):
try:
data = gzip.decompress(data)
except:
pass
return data
def ungzip(data):
try:
#print("?????...")
data = gzip.decompress(data)
#print("????...")
except:
print("?????????...")
return data
#CSDN???
def verify_unit_response(zip_ext_file, min_lines):
assert isinstance(zip_ext_file, zipfile.ZipExtFile)
unit_output = gzip.decompress(zip_ext_file.read())
assert len(unit_output.decode().split('\n')) >= min_lines, 'Expect at least {} lines. Full unit output {}'.format(
min_lines, unit_output)
def resolve_msg(msg_bytes):
try:
msg = simplejson.loads(gzip.decompress(crypto.decrypt(msg_bytes)))
return msg if msg['proto'] == 'iddp' else None
except Exception as e:
logging.error("Received an invalid message: %s" % str(e))
return None
# iot_force_download_msg = {'proto': 'iddp', 'role': 'force_download', 'routine': ':19001/your_file'}
def ungzip(data):
data = gzip.decompress(data)
return data;