def _testCongestion(self):
# test the behavior in case of congestion
self.data = b'fill'
self.cli.setblocking(False)
try:
# try to lower the receiver's socket buffer size
self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
except OSError:
pass
with self.assertRaises(OSError) as cm:
try:
# fill the receiver's socket buffer
while True:
self.cli.sendto(self.data, 0, (HOST, self.port))
finally:
# signal the receiver we're done
self.evt.set()
# sendto() should have failed with ENOBUFS
self.assertEqual(cm.exception.errno, errno.ENOBUFS)
# and we should have received a congestion notification through poll
r, w, x = select.select([self.serv], [], [], 3.0)
self.assertIn(self.serv, r)
评论列表
文章目录