def test_transfer(self):
"""
L{basic.FileSender} sends the content of the given file using a
C{IConsumer} interface via C{beginFileTransfer}. It returns a
L{Deferred} which fires with the last byte sent.
"""
source = BytesIO(b"Test content")
consumer = proto_helpers.StringTransport()
sender = basic.FileSender()
d = sender.beginFileTransfer(source, consumer)
sender.resumeProducing()
# resumeProducing only finishes after trying to read at eof
sender.resumeProducing()
self.assertIsNone(consumer.producer)
self.assertEqual(b"t", self.successResultOf(d))
self.assertEqual(b"Test content", consumer.value())
评论列表
文章目录