def test_accept_connection_exception(self, m_log):
sock = mock.Mock()
sock.fileno.return_value = 10
sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
self.loop.remove_reader = mock.Mock()
self.loop.call_later = mock.Mock()
self.loop._accept_connection(MyProto, sock)
self.assertTrue(m_log.error.called)
self.assertFalse(sock.close.called)
self.loop.remove_reader.assert_called_with(10)
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
# self.loop._start_serving
mock.ANY,
MyProto, sock, None, None)
评论列表
文章目录