def execute(self):
args = self.args
# we can connect in 2 ways. By hostname/ip (and portnumber)
# or by ldap-uri
if "url" in args and ldapurl.isLDAPUrl(args["url"]):
conn = ldap.initialize(args["url"])
else:
ip, port = self.get_address()
conn = ldap.initialize("ldap://%s:%s" % (ip, port))
username = args.get("username", "")
password = args.get("password", "")
conn.simple_bind(username, password)
try:
self._set_version(args, conn)
except ValueError:
return Event.DOWN, "unsupported protocol version"
base = args.get("base", "dc=example,dc=org")
if base == "cn=monitor":
my_res = conn.search_st(base, ldap.SCOPE_BASE,
timeout=self.timeout)
versionstr = str(my_res[0][-1]['description'][0])
self.version = versionstr
return Event.UP, versionstr
scope = args.get("scope", "SUBTREE").upper()
if scope == "BASE":
scope = ldap.SCOPE_BASE
elif scope == "ONELEVEL":
scope = ldap.SCOPE_ONELEVEL
else:
scope = ldap.SCOPE_SUBTREE
filtr = args.get("filter", "objectClass=*")
try:
conn.search_ext_s(base, scope, filterstr=filtr,
timeout=self.timeout)
# pylint: disable=W0703
except Exception as err:
return (Event.DOWN,
"Failed ldapSearch on %s for %s: %s" % (
self.get_address(), filtr, str(err)))
conn.unbind()
return Event.UP, "Ok"
评论列表
文章目录