def test_pauseProducing(self):
"""
L{FileBodyProducer.pauseProducing} temporarily suspends writing bytes
from the input file to the given L{IConsumer}.
"""
expectedResult = b"hello, world"
readSize = 5
output = BytesIO()
consumer = FileConsumer(output)
producer = FileBodyProducer(
BytesIO(expectedResult), self.cooperator, readSize)
complete = producer.startProducing(consumer)
self._scheduled.pop(0)()
self.assertEqual(output.getvalue(), expectedResult[:5])
producer.pauseProducing()
# Sort of depends on an implementation detail of Cooperator: even
# though the only task is paused, there's still a scheduled call. If
# this were to go away because Cooperator became smart enough to cancel
# this call in this case, that would be fine.
self._scheduled.pop(0)()
# Since the producer is paused, no new data should be here.
self.assertEqual(output.getvalue(), expectedResult[:5])
self.assertEqual([], self._scheduled)
self.assertNoResult(complete)
评论列表
文章目录