def test_server_down(self):
"""
Verify an ldap.SERVER_DOWN error will retry 2 more times and that
the connection is closed if all attempts fail.
"""
service = self.service()
testStats = {}
# Verify that without a SERVER_DOWN we don't need to retry, and we
# still have a connection in the pool
service._recordsFromQueryString_inThread("(this=that)", testStats=testStats)
self.assertEquals(testStats["retryNumber"], 0)
self.assertEquals(len(service.connectionPools["query"].connections), 1)
service._recordWithDN_inThread("cn=test", testStats=testStats)
self.assertEquals(testStats["retryNumber"], 0)
self.assertEquals(len(service.connectionPools["query"].connections), 1)
# Force a search to raise SERVER_DOWN
def raiseServerDown(*args, **kwds):
raise ldap.SERVER_DOWN
self.patch(LDAPObject, "search_ext", raiseServerDown)
self.patch(LDAPObject, "search_s", raiseServerDown)
# Now try recordsFromQueryString
try:
service._recordsFromQueryString_inThread("(this=that)", testStats=testStats)
except LDAPQueryError:
# Verify the number of times we retried
self.assertEquals(testStats["retryNumber"], 2)
except:
self.fail("Should have raised LDAPQueryError")
# Verify the connections are all closed
self.assertEquals(len(service.connectionPools["query"].connections), 0)
# Now try recordWithDN
try:
service._recordWithDN_inThread("cn=test", testStats=testStats)
except LDAPQueryError:
# Verify the number of times we retried
self.assertEquals(testStats["retryNumber"], 2)
except:
self.fail("Should have raised LDAPQueryError")
# Verify the connections are all closed
self.assertEquals(len(service.connectionPools["query"].connections), 0)
评论列表
文章目录