def testPausing(self):
"""
Test pause inside data receiving. It uses fake clock to see if
pausing/resuming work.
"""
for packet_size in range(1, 10):
t = StringIOWithoutClosing()
clock = task.Clock()
a = LineTester(clock)
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.pause_buf)/packet_size + 1):
s = self.pause_buf[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.failUnlessEqual(self.pause_output1, a.received)
clock.advance(0)
self.failUnlessEqual(self.pause_output2, a.received)
评论列表
文章目录