def test_on_incomplete_file_received(self):
_file = []
class TestHandler(FTPHandler):
def on_incomplete_file_received(self, file):
_file.append(file)
self._setUp(TestHandler)
data = b'abcde12345' * 100000
self.dummyfile.write(data)
self.dummyfile.seek(0)
with contextlib.closing(
self.client.transfercmd('stor ' + TESTFN)) as conn:
bytes_sent = 0
while True:
chunk = self.dummyfile.read(BUFSIZE)
conn.sendall(chunk)
bytes_sent += len(chunk)
# stop transfer while it isn't finished yet
if bytes_sent >= INTERRUPTED_TRANSF_SIZE or not chunk:
self.client.putcmd('abor')
break
self.assertRaises(ftplib.error_temp, self.client.getresp) # 426
self.client.quit() # prevent race conditions
call_until(lambda: _file, "ret == [os.path.abspath(TESTFN)]")
评论列表
文章目录