def test_headers(self):
total_sent = 0
sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
headers=[b"x" * 512])
total_sent += sent
offset = 4096
nbytes = 4096
while 1:
sent = self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
if sent == 0:
break
total_sent += sent
offset += sent
expected_data = b"x" * 512 + self.DATA
self.assertEqual(total_sent, len(expected_data))
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(hash(data), hash(expected_data))
python类sendfile()的实例源码
def test_trailers(self):
TESTFN2 = support.TESTFN + "2"
file_data = b"abcdef"
with open(TESTFN2, 'wb') as f:
f.write(file_data)
with open(TESTFN2, 'rb')as f:
self.addCleanup(os.remove, TESTFN2)
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
trailers=[b"1234"])
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(data, b"abcdef1234")
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def write_file(self, respiter):
if not self.sendfile(respiter):
for item in respiter:
self.write(item)
def test_invalid_offset(self):
with self.assertRaises(OSError) as cm:
os.sendfile(self.sockno, self.fileno, -1, 4096)
self.assertEqual(cm.exception.errno, errno.EINVAL)
def test_keywords(self):
# Keyword arguments should be supported
os.sendfile(out=self.sockno, offset=0, count=4096,
**{'in': self.fileno})
if self.SUPPORT_HEADERS_TRAILERS:
os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
headers=(), trailers=(), flags=0)
# --- headers / trailers tests
def test_flags(self):
try:
os.sendfile(self.sockno, self.fileno, 0, 4096,
flags=os.SF_NODISKIO)
except OSError as err:
if err.errno not in (errno.EBUSY, errno.EAGAIN):
raise
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def write_file(self, respiter):
if not self.sendfile(respiter):
for item in respiter:
self.write(item)
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def write_file(self, respiter):
if not self.sendfile(respiter):
for item in respiter:
self.write(item)
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def write_file(self, respiter):
if not self.sendfile(respiter):
for item in respiter:
self.write(item)
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def write_file(self, respiter):
if not self.sendfile(respiter):
for item in respiter:
self.write(item)
def can_sendfile(self):
return self.cfg.sendfile is not False and sendfile is not None
def write_file(self, respiter):
if not self.sendfile(respiter):
for item in respiter:
self.write(item)
def initiate_sendfile(self):
"""A wrapper around sendfile."""
try:
sent = sendfile(self._fileno, self._filefd, self._offset,
self.ac_out_buffer_size)
except OSError as err:
if err.errno in _ERRNOS_RETRY or err.errno == errno.EBUSY:
return
elif err.errno in _ERRNOS_DISCONNECTED:
self.handle_close()
else:
if self.tot_bytes_sent == 0:
logger.warning(
"sendfile() failed; falling back on using plain send")
raise _GiveUpOnSendfile
else:
raise
else:
if sent == 0:
# this signals the channel that the transfer is completed
self.discard_buffers()
self.handle_close()
else:
self._offset += sent
self.tot_bytes_sent += sent
# --- utility methods
def test_fallback(self):
# Makes sure that if sendfile() fails and no bytes were
# transmitted yet the server falls back on using plain
# send()
data = b'abcde12345' * 100000
self.dummy_sendfile.write(data)
self.dummy_sendfile.seek(0)
self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
with mock.patch('pyftpdlib.handlers.sendfile',
side_effect=OSError(errno.EINVAL)) as fun:
try:
self.client.retrbinary(
'retr ' + TESTFN, self.dummy_recvfile.write)
assert fun.called
self.dummy_recvfile.seek(0)
datafile = self.dummy_recvfile.read()
self.assertEqual(len(data), len(datafile))
self.assertEqual(hash(data), hash(datafile))
finally:
# We do not use os.remove() because file could still be
# locked by ftpd thread. If DELE through FTP fails try
# os.remove() as last resort.
if os.path.exists(TESTFN):
try:
self.client.delete(TESTFN)
except (ftplib.Error, EOFError, socket.error):
safe_remove(TESTFN)