def test_stor_ascii(self):
# Test STOR in ASCII mode
def store(cmd, fp, blocksize=8192):
# like storbinary() except it sends "type a" instead of
# "type i" before starting the transfer
self.client.voidcmd('type a')
with contextlib.closing(self.client.transfercmd(cmd)) as conn:
while True:
buf = fp.read(blocksize)
if not buf:
break
conn.sendall(buf)
return self.client.voidresp()
try:
data = b'abcde12345\r\n' * 100000
self.dummy_sendfile.write(data)
self.dummy_sendfile.seek(0)
store('stor ' + TESTFN, self.dummy_sendfile)
self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
expected = data.replace(b'\r\n', b(os.linesep))
self.dummy_recvfile.seek(0)
datafile = self.dummy_recvfile.read()
self.assertEqual(len(expected), len(datafile))
self.assertEqual(hash(expected), hash(datafile))
finally:
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
# os.remove() as last resort.
if os.path.exists(TESTFN):
try:
self.client.delete(TESTFN)
except (ftplib.Error, EOFError, socket.error):
safe_remove(TESTFN)
评论列表
文章目录