def test_stou(self):
data = b'abcde12345' * 100000
self.dummy_sendfile.write(data)
self.dummy_sendfile.seek(0)
self.client.voidcmd('TYPE I')
# filename comes in as "1xx FILE: <filename>"
filename = self.client.sendcmd('stou').split('FILE: ')[1]
try:
with contextlib.closing(self.client.makeport()) as sock:
conn, sockaddr = sock.accept()
with contextlib.closing(conn):
conn.settimeout(TIMEOUT)
if hasattr(self.client_class, 'ssl_version'):
conn = ssl.wrap_socket(conn)
while True:
buf = self.dummy_sendfile.read(8192)
if not buf:
break
conn.sendall(buf)
# transfer finished, a 226 response is expected
self.assertEqual('226', self.client.voidresp()[:3])
self.client.retrbinary('retr ' + filename,
self.dummy_recvfile.write)
self.dummy_recvfile.seek(0)
datafile = self.dummy_recvfile.read()
self.assertEqual(len(data), len(datafile))
self.assertEqual(hash(data), hash(datafile))
finally:
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
# os.remove() as last resort.
if os.path.exists(filename):
try:
self.client.delete(filename)
except (ftplib.Error, EOFError, socket.error):
safe_remove(filename)
评论列表
文章目录