def write(self, data):
"""Takes any input data provided, decodes it as quoted-printable, and
passes it on to the underlying object.
:param data: quoted-printable data to decode
"""
# Prepend any cache info to our data.
if len(self.cache) > 0:
data = self.cache + data
# Since the longest possible escape is 3 characters long, either in
# the form '=XX' or '=\r\n', we encode up to 3 characters before the
# end of the string.
enc, rest = data[:-3], data[-3:]
# Encode and write, if we have data.
if len(enc) > 0:
self.underlying.write(binascii.a2b_qp(enc))
# Save remaining in cache.
self.cache = rest
return len(data)
python类a2b_qp()的实例源码
def finalize(self):
"""Finalize this object. This should be called when no more data
should be written to the stream. This function will not raise any
exceptions, but it may write more data to the underlying object if
there is data remaining in the cache.
If the underlying object has a `finalize()` method, this function will
call it.
"""
# If we have a cache, write and then remove it.
if len(self.cache) > 0:
self.underlying.write(binascii.a2b_qp(self.cache))
self.cache = b''
# Finalize our underlying stream.
if hasattr(self.underlying, 'finalize'):
self.underlying.finalize()
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def from_fieldstorage(cls, fs):
"""
Create a dict from a cgi.FieldStorage instance
"""
obj = cls()
# fs.list can be None when there's nothing to parse
for field in fs.list or ():
charset = field.type_options.get('charset', 'utf8')
transfer_encoding = field.headers.get('Content-Transfer-Encoding', None)
supported_tranfer_encoding = {
'base64' : binascii.a2b_base64,
'quoted-printable' : binascii.a2b_qp
}
if PY3: # pragma: no cover
if charset == 'utf8':
decode = lambda b: b
else:
decode = lambda b: b.encode('utf8').decode(charset)
else:
decode = lambda b: b.decode(charset)
if field.filename:
field.filename = decode(field.filename)
obj.add(field.name, field)
else:
value = field.value
if transfer_encoding in supported_tranfer_encoding:
if PY3: # pragma: no cover
# binascii accepts bytes
value = value.encode('utf8')
value = supported_tranfer_encoding[transfer_encoding](value)
if PY3: # pragma: no cover
# binascii returns bytes
value = value.decode('utf8')
obj.add(field.name, decode(value))
return obj
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def from_fieldstorage(cls, fs):
"""
Create a dict from a cgi.FieldStorage instance
"""
obj = cls()
# fs.list can be None when there's nothing to parse
for field in fs.list or ():
charset = field.type_options.get('charset', 'utf8')
transfer_encoding = field.headers.get('Content-Transfer-Encoding', None)
supported_tranfer_encoding = {
'base64' : binascii.a2b_base64,
'quoted-printable' : binascii.a2b_qp
}
if PY3: # pragma: no cover
if charset == 'utf8':
decode = lambda b: b
else:
decode = lambda b: b.encode('utf8').decode(charset)
else:
decode = lambda b: b.decode(charset)
if field.filename:
field.filename = decode(field.filename)
obj.add(field.name, field)
else:
value = field.value
if transfer_encoding in supported_tranfer_encoding:
if PY3: # pragma: no cover
# binascii accepts bytes
value = value.encode('utf8')
value = supported_tranfer_encoding[transfer_encoding](value)
if PY3: # pragma: no cover
# binascii returns bytes
value = value.decode('utf8')
obj.add(field.name, decode(value))
return obj
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def decodestring(s, header=False):
if a2b_qp is not None:
return a2b_qp(s, header=header)
from io import BytesIO
infp = BytesIO(s)
outfp = BytesIO()
decode(infp, outfp, header=header)
return outfp.getvalue()
# Other helper functions
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def from_fieldstorage(cls, fs):
"""
Create a dict from a cgi.FieldStorage instance
"""
obj = cls()
# fs.list can be None when there's nothing to parse
for field in fs.list or ():
charset = field.type_options.get('charset', 'utf8')
transfer_encoding = field.headers.get('Content-Transfer-Encoding', None)
supported_tranfer_encoding = {
'base64' : binascii.a2b_base64,
'quoted-printable' : binascii.a2b_qp
}
if PY3: # pragma: no cover
if charset == 'utf8':
decode = lambda b: b
else:
decode = lambda b: b.encode('utf8').decode(charset)
else:
decode = lambda b: b.decode(charset)
if field.filename:
field.filename = decode(field.filename)
obj.add(field.name, field)
else:
value = field.value
if transfer_encoding in supported_tranfer_encoding:
if PY3: # pragma: no cover
# binascii accepts bytes
value = value.encode('utf8')
value = supported_tranfer_encoding[transfer_encoding](value)
if PY3: # pragma: no cover
# binascii returns bytes
value = value.decode('utf8')
obj.add(field.name, decode(value))
return obj
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp("", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp("= "), "= ")
self.assertEqual(binascii.a2b_qp("=="), "=")
self.assertEqual(binascii.a2b_qp("=AX"), "=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp("\xff\r\n\xff\n\xff"),
"=FF\r\n=FF\r\n=FF"
)
self.assertEqual(
binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"),
"0"*75+"=\r\n=FF\r\n=FF\r\n=FF"
)
self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n')
self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp('.'), '=2E')
self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n')
self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def test_qp(self):
binascii.a2b_qp(data=b"", header=False) # Keyword arguments allowed
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def decodestring(s, header=False):
if a2b_qp is not None:
return a2b_qp(s, header=header)
from io import BytesIO
infp = BytesIO(s)
outfp = BytesIO()
decode(infp, outfp, header=header)
return outfp.getvalue()
# Other helper functions
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp("", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp("= "), "= ")
self.assertEqual(binascii.a2b_qp("=="), "=")
self.assertEqual(binascii.a2b_qp("=AX"), "=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp("\xff\r\n\xff\n\xff"),
"=FF\r\n=FF\r\n=FF"
)
self.assertEqual(
binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"),
"0"*75+"=\r\n=FF\r\n=FF\r\n=FF"
)
self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n')
self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp('.'), '=2E')
self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n')
self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def from_fieldstorage(cls, fs):
"""
Create a dict from a cgi.FieldStorage instance
"""
obj = cls()
# fs.list can be None when there's nothing to parse
for field in fs.list or ():
charset = field.type_options.get('charset', 'utf8')
transfer_encoding = field.headers.get('Content-Transfer-Encoding', None)
supported_tranfer_encoding = {
'base64' : binascii.a2b_base64,
'quoted-printable' : binascii.a2b_qp
}
if PY3: # pragma: no cover
if charset == 'utf8':
decode = lambda b: b
else:
decode = lambda b: b.encode('utf8').decode(charset)
else:
decode = lambda b: b.decode(charset)
if field.filename:
field.filename = decode(field.filename)
obj.add(field.name, field)
else:
value = field.value
if transfer_encoding in supported_tranfer_encoding:
if PY3: # pragma: no cover
# binascii accepts bytes
value = value.encode('utf8')
value = supported_tranfer_encoding[transfer_encoding](value)
if PY3: # pragma: no cover
# binascii returns bytes
value = value.decode('utf8')
obj.add(field.name, decode(value))
return obj
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def _decode_content_transfer(self, data):
encoding = self.headers[CONTENT_TRANSFER_ENCODING].lower()
if encoding == 'base64':
return base64.b64decode(data)
elif encoding == 'quoted-printable':
return binascii.a2b_qp(data)
elif encoding == 'binary':
return data
else:
raise RuntimeError('unknown content transfer encoding: {}'
''.format(encoding))
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def decodestring(s, header=False):
if a2b_qp is not None:
return a2b_qp(s, header=header)
from io import BytesIO
infp = BytesIO(s)
outfp = BytesIO()
decode(infp, outfp, header=header)
return outfp.getvalue()
# Other helper functions
def decodestring(s, header = 0):
if a2b_qp is not None:
return a2b_qp(s, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
decode(infp, outfp, header = header)
return outfp.getvalue()
# Other helper functions
def from_fieldstorage(cls, fs):
"""
Create a dict from a cgi.FieldStorage instance
"""
obj = cls()
# fs.list can be None when there's nothing to parse
for field in fs.list or ():
charset = field.type_options.get('charset', 'utf8')
transfer_encoding = field.headers.get('Content-Transfer-Encoding', None)
supported_tranfer_encoding = {
'base64' : binascii.a2b_base64,
'quoted-printable' : binascii.a2b_qp
}
if PY3: # pragma: no cover
if charset == 'utf8':
decode = lambda b: b
else:
decode = lambda b: b.encode('utf8').decode(charset)
else:
decode = lambda b: b.decode(charset)
if field.filename:
field.filename = decode(field.filename)
obj.add(field.name, field)
else:
value = field.value
if transfer_encoding in supported_tranfer_encoding:
if PY3: # pragma: no cover
# binascii accepts bytes
value = value.encode('utf8')
value = supported_tranfer_encoding[transfer_encoding](value)
if PY3: # pragma: no cover
# binascii returns bytes
value = value.decode('utf8')
obj.add(field.name, decode(value))
return obj