def show():
city=e1.get()
url='http://wthrcdn.etouch.cn/WeatherApi?city='+urllib.parse.quote(city)
weather= urllib.request.urlopen(url).read()
weather_data = gzip.decompress(weather).decode('utf-8')
try:
soup=BeautifulSoup(weather_data)
wheater=soup.find_all('weather')
Text = (('??:%s'%soup.shidu.text),('??:%s'%soup.fengli.text),wheater[0].high.text,wheater[0].low.text,('??:%s'%wheater[0].type.text))
e2['state']= 'normal'
e2.delete(1.0,tk.END)
e2.insert(tk.END,Text)
e2['state']= 'disabled'
except:
Text ='??????????????????'
e2['state']= 'normal'
e2.delete(1.0,tk.END)
e2.insert(tk.END,Text)
e2['state']= 'disabled'
python类decompress()的实例源码
def download_sifts_xml(pdb_id, outdir='', outfile=''):
"""Download the SIFTS file for a PDB ID.
Args:
pdb_id:
outdir:
outfile:
Returns:
"""
baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/'
filename = '{}.xml.gz'.format(pdb_id)
if outfile:
outfile = op.join(outdir, outfile)
else:
outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml')
if not op.exists(outfile):
response = urlopen(baseURL + filename)
with open(outfile, 'wb') as f:
f.write(gzip.decompress(response.read()))
return outfile
def test_brotli_dynamic_cache(br_client):
from brotli import decompress
from time import sleep
br_client.get(
'/',
headers=[('accept-encoding', 'gzip, br')])
sleep(0.5)
resp = br_client.get(
'/',
headers=[('accept-encoding', 'gzip, br')])
assert resp.headers.get('x-brotli-cache') == 'HIT'
assert resp.headers.get('content-encoding') == 'br'
assert b"Hello, world!" in decompress(resp.data)
def saveContentOfURL(target_url):
# ?????????URL??????,???
if target_url in searched_url:
return
try:
# ??GET??target_url
article_response = urllib.request.urlopen(target_url)
raw_data = article_response.read()
# ???????gzip????????
if article_response.getheader("Content-Encoding") == "gzip":
raw_data = gzip.decompress(raw_data)
# gb2312??,???????????????,??????
article_data = raw_data.decode('gb2312', 'ignore')
# ?????<p></p>????clean????
forEachMatch(pattern_str='<p>(.*?)</p>', to_match_str=article_data,
func=lambda match: file_operator.writeFile(cleanArticle(match.group(1))))
except urllib.error.URLError:
print(target_url, 'is a wrong url')
except BaseException as message:
print(message)
# ?????????URL
searched_url.add(target_url)
# ??<p></p>????,???????
def import_to_store(self, compressed_nar):
"""Given a compressed NAR, extract it and import it into the nix store.
:param compressed_nar: The bytes of a NAR, compressed.
:type compressed_nar: ``str``
"""
# Figure out how to extract the content.
if self.compression.lower() in ("xz", "xzip"):
data = lzma.decompress(compressed_nar)
elif self.compression.lower() in ("bz2", "bzip2"):
data = bz2.decompress(compressed_nar)
else:
data = gzip.decompress(compressed_nar)
# Once extracted, convert it into a nix export object and import.
export = self.nar_to_export(data)
imported_path = export.import_to_store()
def get_page(self, _url):
''' ????????
return str '''
header = { 'Accept-Encoding': 'gzip' }
header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)]
if opts['user_agent']: header['User-Agent'] = opts['user_agent']
with (yield from semaphore):
response = yield from aiohttp.request('GET', _url, headers = header)
page = yield from response.read()
try:
if self.url_type == "2": return "None Content"
if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8')
else: return gzip.decompress(page)
except OSError:
return page
def _process_response(self, res):
"""
Take the response object and return JSON
:param res:
:return:
"""
# TODO Figure out exceptions here
if res.headers['Content-Encoding'] == 'gzip':
self.send_log('Detected gzipped response', 'debug')
raw_output = gzip.decompress(res.read()).decode('utf-8')
else:
self.send_log('Detected other type of response encoding: {}'.format(res.headers['Content-Encoding']), 'debug')
raw_output = res.read().decode('utf-8')
json_output = json.loads(raw_output)
return json_output
def _load_data():
# https://github.com/raumkraut/python-debian/blob/master/README.deb822
global _data
mirror = BP.config['L4TM_MIRROR']
release = BP.config['L4TM_RELEASE']
repo = '%s/dists/%s/%%s/%%s/Packages.gz' % (mirror, release)
_data = {}
for area in BP.config['L4TM_AREAS']:
for arch in ('binary-all', 'binary-arm64'):
BP.logger.info('Loading/processing %s/%s/Packages.gz...' % (
area, arch))
pkgarea = repo % (area, arch)
pkgresp = HTTP_REQUESTS.get(pkgarea)
if pkgresp.status_code != 200:
BP.logger.error('%s not found' % arch)
continue
BP.logger.debug('Uncompressing %s bytes' % pkgresp.headers['content-length'])
unzipped = gzip.decompress(pkgresp.content) # bytes all around
BP.logger.debug('Parsing %d bytes of package data' % len(unzipped))
unzipped = BytesIO(unzipped) # the next step needs read()
tmp = [ src for src in Packages.iter_paragraphs(unzipped) ]
_data.update(dict((pkg['Package'], pkg) for pkg in tmp))
def extract_features(doc):
html = doc['html'] or ''
if not doc_is_extra_sampled(doc):
try:
html = gzip.decompress(base64.b64decode(html)).decode('utf8')
except Exception:
pass # support not compressed html too
text = html_text.extract_text(html)
try:
lang = langdetect.detect(text)
except LangDetectException:
lang = None
return {
'text': text,
'language': lang,
}
def msg_recv(conn, sendfunc, closefunc):
'''
Function msg_recv reads null-delimited series of bytes from `conn`, which
is a socket. Each series of bytes is then de-serialized into a json object,
and `sendfunc` is called with that json object.
`closefunc` is called if/when the socket `conn` is closed.
'''
buf = bytes()
while True:
try:
data = conn.recv(8192)
# No data means the connection is closed
if not data:
closefunc()
return
inbuf = buf + data
if SEP in inbuf:
parts = inbuf.split(SEP)
# logging.debug("Length of parts: {}".format(len(parts)))
tosend = [parts[0]]
for p in parts[1:-1]:
tosend.append(p)
buf = parts[-1]
for msg in tosend:
m = gzip.decompress(msg)
m = m.decode('utf-8')
logging.debug("Msg: {}".format(m[:150]+'...' if len(m) > 150 else m))
obj = json.loads(m)
sendfunc(obj)
else:
buf += data
except Exception as e:
logging.exception(e)
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
self._logger.debug('Handling gzip packed data')
reader.read_int(signed=False) # code
packed_data = reader.tgread_bytes()
unpacked_data = gzip.decompress(packed_data)
with BinaryReader(unpacked_data) as compressed_reader:
return self._process_msg(
msg_id, sequence, compressed_reader, updates)
# endregion
def _lzma(self):
'''LZMA processor'''
try:
archive = lzma.decompress(self.cur_attachment.file_obj.read())
new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
cur_file = File(archive, new_fn)
self.process_payload(cur_file)
except:
self.cur_attachment.make_dangerous()
return self.cur_attachment
def _bzip(self):
'''BZip2 processor'''
try:
archive = bz2.decompress(self.cur_attachment.file_obj.read())
new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename)
cur_file = File(archive, new_fn)
self.process_payload(cur_file)
except:
self.cur_attachment.make_dangerous()
return self.cur_attachment
def download_biological_assemblies(pdb_id, outdir):
"""Downloads biological assembly file from:
`ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/`
Args:
outdir (str): Output directory of the decompressed assembly
"""
# TODO: not tested yet
if not op.exists(outdir):
raise ValueError('{}: output directory does not exist'.format(outdir))
folder = pdb_id[1:3]
server = 'ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/{}/'.format(folder)
html_folder = urlopen(server).readlines()
for line in html_folder:
if pdb_id in str(line).strip():
file_name = '%s' % (pdb_id + str(line).strip().split(pdb_id)[1].split('\r\n')[0])
outfile_name = file_name.replace('.', '_')
outfile_name = outfile_name.replace('_gz', '.pdb')
f = urlopen(op.join(server, file_name))
decompressed_data = zlib.decompress(f.read(), 16 + zlib.MAX_WBITS)
with open(op.join(outdir, outfile_name), 'wb') as f:
f.write(decompressed_data)
f.close()
log.debug('{}: downloaded biological assembly')
return op.join(outdir, outfile_name)
def decode(value, decompress=False):
"""
Decodes response from Base64 encoded string.
"""
decoded = base64.b64decode(value)
if decompress:
decoded = gzip.decompress(decoded)
return json.loads(decoded.decode())
def read(reader):
assert reader.read_int(signed=False) == GzipPacked.CONSTRUCTOR_ID
return gzip.decompress(reader.tgread_bytes())
def get_vxstream_report(sha256, envid, type_):
# XML, HTML, BIN and PCAP are GZipped
Sample.query.filter_by(sha256=sha256).first_or_404()
headers = {
'Accept': 'text/html',
'User-Agent': 'VxStream Sandbox API Client'}
params = {'type': type_, 'environmentId': envid}
vx = vxstream.api.get('result/{}'.format(sha256),
params=params, headers=headers)
if type_ in ['xml', 'html', 'bin', 'pcap']:
return gzip.decompress(vx)
return vx
def get_fireeye_report(sha256, envid, type):
raise ApiException({}, 501)
# XML, HTML, BIN and PCAP are GZipped
Sample.query.filter_by(sha256=sha256).first_or_404()
headers = {
'Accept': 'text/html',
'User-Agent': 'FireEye Sandbox API Client'}
params = {'type': type, 'environmentId': envid}
vx = fireeye.api.get('result/{}'.format(sha256),
params=params, headers=headers)
if type in ['xml', 'html', 'bin', 'pcap']:
return gzip.decompress(vx)
return vx
def get_cp_vxstream_report(sha256, envid, type_):
# XML, HTML, BIN and PCAP are GZipped
Sample.query.filter_by(sha256=sha256, user_id=g.user.id).first_or_404()
headers = {
'Accept': 'text/html',
'User-Agent': 'VxStream Sandbox API Client'}
params = {'type': type_, 'environmentId': envid}
vx = vxstream.api.get('result/{}'.format(sha256),
params=params, headers=headers)
if type_ in ['xml', 'html', 'bin', 'pcap']:
return gzip.decompress(vx)
return vx
def fetch_page(query):
url = REQUEST_URL.format(BASE_URL, query)
request = urllib.request.Request(url)
request.add_header('User-agent', _random_user_agent())
request.add_header('connection', 'keep-alive')
request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br')
request.add_header('referer', REQUEST_URL.format(BASE_URL, ""))
print(url)
response = urllib.request.urlopen(request)
data = response.read()
print(type(data))
return gzip.decompress(data)
def read_bytes_data():
with open('sample.bytes', 'rb') as file:
content = file.read()
# dom = BeautifulSoup(gzip.decompress(content))
data = gzip.decompress(content)
# soup = BeautifulSoup(data, 'html.parser')
# links = soup.find_all('a', {'class':'l _HId'})
# for link in links:
# print(link.get('href'))
# print(soup.prettify())
def decode_gzip(content):
assert isinstance(content, bytes)
return gzip.decompress(content)
def decode_deflate(content):
assert isinstance(content, bytes)
try:
return zlib.decompress(content)
except Exception:
return zlib.decompress(content, -zlib.MAX_WBITS)
def test_brotli_static_gzip(br_client):
from gzip import decompress
gzip_resp = br_client.get(
'/static/bee.txt',
headers=[('accept-encoding', 'gzip')])
assert gzip_resp.headers.get('content-encoding') == 'gzip'
assert BEE_SCRIPT in decompress(gzip_resp.data)
def test_brotli_static_br(br_client):
from brotli import decompress
br_resp = br_client.get(
'/static/bee.txt',
headers=[('accept-encoding', 'gzip, br')])
assert br_resp.headers.get('content-encoding') == 'br'
assert BEE_SCRIPT in decompress(br_resp.data)
def test_brotli_dynamic(br_client):
from brotli import decompress
resp = br_client.get(
'/',
headers=[('accept-encoding', 'gzip, br')])
assert resp.headers.get('x-brotli-cache') == 'MISS'
assert resp.headers.get('content-encoding') == 'br'
assert b"Hello, world!" in decompress(resp.data)
def test_main(bucket, crypto, processor):
_p = functools.partial(_payload, crypto)
today = date.today()
user = 'foo'
prefix = 'v2/sessions/%s/%s/%s/' % (today.year, today.month, user)
records = [
make_record('2', _p({'user': user, 'extra': 1})),
]
assert main(processor, records) == ('2', 0)
objs = list(bucket.filter(Prefix=prefix))
assert len(objs) == 1
assert objs[0].key.endswith('.json.gz')
obj = bucket.get(objs[0].key)
assert obj['ContentEncoding'] == 'gzip'
assert obj['ContentType'] == 'application/json'
body = obj['Body'].read()
obj['Body'].close()
body = json.loads(gzip.decompress(body).decode('utf-8'))
assert body == {'user': user, 'extra': 1}
# Upload a second time
records = [
make_record('3', _p({'user': user, 'extra': 2})),
]
assert main(processor, records) == ('3', 0)
objs = list(bucket.filter(Prefix=prefix))
assert len(objs) == 2
def _decompress_result(self, compressed_result):
result_bytes = bytes([int(c, 16) for c in compressed_result.split('x')])
result = gzip.decompress(result_bytes).decode()
result = json.loads(result)
return result
def decompress(
data,
):
decompressed_object = gzip.decompress(data)
return decompressed_object
cloudtrail_to_es.py 文件源码
项目:analytics-platform-ops
作者: ministryofjustice
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def log_file_s3_object(record):
return gzip.decompress(s3.get_object(
Bucket=record['s3']['bucket']['name'],
Key=record['s3']['object']['key'])['Body'].read())