def set_bytes_content(msg, data, maintype, subtype, cte='base64',
disposition=None, filename=None, cid=None,
params=None, headers=None):
_prepare_set(msg, maintype, subtype, headers)
if cte == 'base64':
data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
elif cte == 'quoted-printable':
# XXX: quoprimime.body_encode won't encode newline characters in data,
# so we can't use it. This means max_line_length is ignored. Another
# bug to fix later. (Note: encoders.quopri is broken on line ends.)
data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
data = data.decode('ascii')
elif cte == '7bit':
# Make sure it really is only ASCII. The early warning here seems
# worth the overhead...if you care write your own content manager :).
data.encode('ascii')
elif cte in ('8bit', 'binary'):
data = data.decode('ascii', 'surrogateescape')
msg.set_payload(data)
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
python类b2a_qp()的实例源码
def _apply_content_transfer_encoding(self, stream):
encoding = self.headers[CONTENT_TRANSFER_ENCODING].lower()
if encoding == 'base64':
buffer = bytearray()
while True:
if buffer:
div, mod = divmod(len(buffer), 3)
chunk, buffer = buffer[:div * 3], buffer[div * 3:]
if chunk:
yield base64.b64encode(chunk)
chunk = next(stream, None)
if not chunk:
if buffer:
yield base64.b64encode(buffer[:])
return
buffer.extend(chunk)
elif encoding == 'quoted-printable':
for chunk in stream:
yield binascii.b2a_qp(chunk)
elif encoding == 'binary':
yield from stream
else:
raise RuntimeError('unknown content transfer encoding: {}'
''.format(encoding))
def set_bytes_content(msg, data, maintype, subtype, cte='base64',
disposition=None, filename=None, cid=None,
params=None, headers=None):
_prepare_set(msg, maintype, subtype, headers)
if cte == 'base64':
data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
elif cte == 'quoted-printable':
# XXX: quoprimime.body_encode won't encode newline characters in data,
# so we can't use it. This means max_line_length is ignored. Another
# bug to fix later. (Note: encoders.quopri is broken on line ends.)
data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
data = data.decode('ascii')
elif cte == '7bit':
# Make sure it really is only ASCII. The early warning here seems
# worth the overhead...if you care write your own content manager :).
data.encode('ascii')
elif cte in ('8bit', 'binary'):
data = data.decode('ascii', 'surrogateescape')
msg.set_payload(data)
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def encodestring(s, quotetabs=False, header=False):
if b2a_qp is not None:
return b2a_qp(s, quotetabs=quotetabs, header=header)
from io import BytesIO
infp = BytesIO(s)
outfp = BytesIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp("", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp("= "), "= ")
self.assertEqual(binascii.a2b_qp("=="), "=")
self.assertEqual(binascii.a2b_qp("=AX"), "=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp("\xff\r\n\xff\n\xff"),
"=FF\r\n=FF\r\n=FF"
)
self.assertEqual(
binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"),
"0"*75+"=\r\n=FF\r\n=FF\r\n=FF"
)
self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n')
self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp('.'), '=2E')
self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n')
self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def test_qp(self):
binascii.a2b_qp(data=b"", header=False) # Keyword arguments allowed
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def encodestring(s, quotetabs=False, header=False):
if b2a_qp is not None:
return b2a_qp(s, quotetabs=quotetabs, header=header)
from io import BytesIO
infp = BytesIO(s)
outfp = BytesIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp("", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp("= "), "= ")
self.assertEqual(binascii.a2b_qp("=="), "=")
self.assertEqual(binascii.a2b_qp("=AX"), "=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp("\xff\r\n\xff\n\xff"),
"=FF\r\n=FF\r\n=FF"
)
self.assertEqual(
binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"),
"0"*75+"=\r\n=FF\r\n=FF\r\n=FF"
)
self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n')
self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp('.'), '=2E')
self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n')
self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def test_qp(self):
# A test for SF bug 534347 (segfaults without the proper fix)
try:
binascii.a2b_qp(b"", **{1:1})
except TypeError:
pass
else:
self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError")
self.assertEqual(binascii.a2b_qp(b"= "), b"= ")
self.assertEqual(binascii.a2b_qp(b"=="), b"=")
self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX")
self.assertRaises(TypeError, binascii.b2a_qp, foo="bar")
self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00")
self.assertEqual(
binascii.b2a_qp(b"\xff\r\n\xff\n\xff"),
b"=FF\r\n=FF\r\n=FF")
self.assertEqual(
binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"),
b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF")
self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n')
self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True),
b'foo=09bar=09\n')
self.assertEqual(binascii.b2a_qp(b'.'), b'=2E')
self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n')
self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def encodestring(s, quotetabs=False, header=False):
if b2a_qp is not None:
return b2a_qp(s, quotetabs=quotetabs, header=header)
from io import BytesIO
infp = BytesIO(s)
outfp = BytesIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def encodestring(s, quotetabs = 0, header = 0):
if b2a_qp is not None:
return b2a_qp(s, quotetabs = quotetabs, header = header)
from cStringIO import StringIO
infp = StringIO(s)
outfp = StringIO()
encode(infp, outfp, quotetabs, header)
return outfp.getvalue()
def printchar(char):
"""Useful debugging function for milter developers."""
print ('char: %s [qp=%s][hex=%s][base64=%s]' %
(char, binascii.b2a_qp(char), binascii.b2a_hex(char),
binascii.b2a_base64(char)))
#end def printchar(char).
def __send_response(self, response):
"""Send data down the milter socket.
Args:
response: The data to send.
"""
#logging.debug(' >>> %s', binascii.b2a_qp(response[0]))
self.push(struct.pack('!I', len(response)))
self.push(response)
def read_milter_data(self):
""" Callback from asynchat once we have read the milter packet length
worth of bytes on the socket and it is accumulated in our input buffer
(which is the milter command + data to send to the dispatcher). """
import binascii
inbuff = "".join(self.__input)
self.__input = []
if not inbuff.startswith("B"):
if self.chmilt == "Receiver":
logging.debug(' read: %s %s', self.chmilt, binascii.b2a_qp(inbuff))
try:
response = self.__milter_dispatcher.Dispatch(inbuff)
#logging.debug(' >>> resp: [%s]', response)
if type(response) == list:
for r in response:
self.__send_response(r)
elif response:
self.__send_response(response)
# rinse and repeat :)
self.found_terminator = self.read_packetlen
self.set_terminator(MILTER_LEN_BYTES)
except kimpf.PpyMilterCloseConnection, e:
self.close()
#end class ConnectionHandler(asynchat.async_chat).
#end class AsyncPpyMilterServer(asyncore.dispatcher).
#SN July 13, 2015
#Bring configuration processing inside here, instead of in commons.
def base64code(s,d):
global b64str
global f
if(d==len(b64str)):
f.write(binascii.b2a_qp(base64.b64decode(s))+'\n')
else:
base64code(s+b64str[d],d+1)
if b64str[d].isalpha():
base64code(s+b64str[d].lower(),d+1)
def base64code(s,d):
global b64str
global f
if(d==len(b64str)):
f.write(binascii.b2a_qp(base64.b64decode(s))+'\n')
else:
base64code(s+b64str[d],d+1)
if b64str[d].isalpha():
base64code(s+b64str[d].lower(),d+1)