def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content, -zlib.MAX_WBITS)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
python类MAX_WBITS的实例源码
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content, -zlib.MAX_WBITS)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def getDanmu(cid):
if not cid:
return "?????"
try:
cid_url = "http://comment.bilibili.com/%s.xml" % cid
danmu_xml = urllib.request.urlopen(cid_url).read()
xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode() # ????????
return xml # ?????
except Exception:
pass
# ??xml??????????????
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def content(self):
"""Raw content of response (i.e. bytes).
:returns: Body of HTTP response
:rtype: :class:`str`
"""
if not self._content:
# Decompress gzipped content
if self._gzipped:
decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._content = decoder.decompress(self.raw.read())
else:
self._content = self.raw.read()
self._content_loaded = True
return self._content
def content(self):
"""Raw content of response (i.e. bytes).
:returns: Body of HTTP response
:rtype: :class:`str`
"""
if not self._content:
# Decompress gzipped content
if self._gzipped:
decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._content = decoder.decompress(self.raw.read())
else:
self._content = self.raw.read()
self._content_loaded = True
return self._content
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def content(self):
"""Raw content of response (i.e. bytes).
:returns: Body of HTTP response
:rtype: str
"""
if not self._content:
# Decompress gzipped content
if self._gzipped:
decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._content = decoder.decompress(self.raw.read())
else:
self._content = self.raw.read()
self._content_loaded = True
return self._content
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def content(self):
"""Raw content of response (i.e. bytes).
:returns: Body of HTTP response
:rtype: :class:`str`
"""
if not self._content:
# Decompress gzipped content
if self._gzipped:
decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._content = decoder.decompress(self.raw.read())
else:
self._content = self.raw.read()
self._content_loaded = True
return self._content
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def content(self):
"""Raw content of response (i.e. bytes).
:returns: Body of HTTP response
:rtype: :class:`str`
"""
if not self._content:
# Decompress gzipped content
if self._gzipped:
decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._content = decoder.decompress(self.raw.read())
else:
self._content = self.raw.read()
self._content_loaded = True
return self._content
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None
def decompress(self, data):
if not data:
return data
if not self._first_try:
return self._obj.decompress(data)
self._data += data
try:
return self._obj.decompress(data)
except zlib.error:
self._first_try = False
self._obj = zlib.decompressobj(-zlib.MAX_WBITS)
try:
return self.decompress(self._data)
finally:
self._data = None