def test_stor_ascii_2(self):
# Test that no extra extra carriage returns are added to the
# file in ASCII mode in case CRLF gets truncated in two chunks
# (issue 116)
def store(cmd, fp, blocksize=8192):
# like storbinary() except it sends "type a" instead of
# "type i" before starting the transfer
self.client.voidcmd('type a')
with contextlib.closing(self.client.transfercmd(cmd)) as conn:
while True:
buf = fp.read(blocksize)
if not buf:
break
conn.sendall(buf)
return self.client.voidresp()
old_buffer = DTPHandler.ac_in_buffer_size
try:
# set a small buffer so that CRLF gets delivered in two
# separate chunks: "CRLF", " f", "oo", " CR", "LF", " b", "ar"
DTPHandler.ac_in_buffer_size = 2
data = b'\r\n foo \r\n bar'
self.dummy_sendfile.write(data)
self.dummy_sendfile.seek(0)
store('stor ' + TESTFN, self.dummy_sendfile)
expected = data.replace(b'\r\n', b(os.linesep))
self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
self.dummy_recvfile.seek(0)
self.assertEqual(expected, self.dummy_recvfile.read())
finally:
DTPHandler.ac_in_buffer_size = old_buffer
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
# os.remove() as last resort.
if os.path.exists(TESTFN):
try:
self.client.delete(TESTFN)
except (ftplib.Error, EOFError, socket.error):
safe_remove(TESTFN)
评论列表
文章目录