def __init__(self, username, password):
ldap_host = "192.168.78.8"
ldap_port = "389"
ldaps_port = "636"
ldap_enable_ldaps = False
self.ldap_base_dn = "DC=example,DC=com,DC=cn" # example.com.cn
self.ldap_user = username
self.ldap_password = password
if ldap_enable_ldaps is True:
self.uri = "ldaps://" + ldap_host + ":" + ldaps_port
else:
self.uri = "ldap://" + ldap_host + ":" + ldap_port
self.is_active = False
self.user_data = None
self.conn = ldap.initialize(self.uri)
try:
self.conn.set_option(ldap.OPT_REFERRALS, 0) # this option is required in Windows Server 2012
self.conn.simple_bind_s(who=self.ldap_user, cred=self.ldap_password)
except ldap.INVALID_CREDENTIALS:
raise Exception("Invalid credentials")
except ldap.SERVER_DOWN:
raise Exception("Can't contact LDAP server")
self.is_active = True
self.user_data = self.conn.search_s(self.ldap_base_dn, ldap.SCOPE_SUBTREE,
'userPrincipalName=' + self.ldap_user)
# self.user_data = self.conn.search_s(self.ldap_base_dn, ldap.SCOPE_SUBTREE)
self.conn.unbind()
python类SCOPE_SUBTREE的实例源码
pyAuthenticationByLDAP.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _get_tuple_for_domain(cls, lobj, domain):
entry = lobj.search_s(CONF.ldap_dns_base_dn, ldap.SCOPE_SUBTREE,
'(associatedDomain=%s)' % utils.utf8(domain))
if not entry:
return None
if len(entry) > 1:
LOG.warning(_LW("Found multiple matches for domain "
"%(domain)s.\n%(entry)s"),
domain, entry)
return entry[0]
def _get_all_domains(cls, lobj):
entries = lobj.search_s(CONF.ldap_dns_base_dn,
ldap.SCOPE_SUBTREE, '(sOARecord=*)')
domains = []
for entry in entries:
domain = entry[1].get('associatedDomain')
if domain:
domains.append(domain[0])
return domains
def delete(self):
"""Delete the domain that this entry refers to."""
entries = self.lobj.search_s(self.dn,
ldap.SCOPE_SUBTREE,
'(aRecord=*)')
for entry in entries:
self.lobj.delete_s(entry[0])
self.lobj.delete_s(self.dn)
def subentry_with_name(self, name):
entry = self.lobj.search_s(self.dn, ldap.SCOPE_SUBTREE,
'(associatedDomain=%s.%s)' %
(utils.utf8(name),
utils.utf8(self.qualified_domain)))
if entry:
return HostEntry(self, entry[0])
else:
return None
def subentries_with_ip(self, ip):
entries = self.lobj.search_s(self.dn, ldap.SCOPE_SUBTREE,
'(aRecord=%s)' % utils.utf8(ip))
objs = []
for entry in entries:
if 'associatedDomain' in entry[1]:
objs.append(HostEntry(self, entry))
return objs
def getAllUsers(self, attrs=''):
if not attrs:
attrs = ['cn', 'userPrincipalName']
objectFilter = '(objectCategory=user)'
base_dn = self.domainBase
try:
rawUsers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving users"
print "[!] {}".format(e)
sys.exit(1)
return (self.get_search_results(rawUsers), attrs)
def getAllGroups(self,attrs=''):
if not attrs:
attrs = ['distinguishedName', 'cn']
objectFilter = '(objectCategory=group)'
base_dn = self.domainBase
try:
rawGroups = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving groups"
print "[!] {}".format(e)
sys.exit(1)
return (self.get_search_results(rawGroups), attrs)
def doFuzzySearch(self, searchTerm, objectCategory=''):
if objectCategory:
objectFilter = '(&(objectCategory={})(anr={}))'.format(objectCategory, searchTerm)
else:
objectFilter = '(anr={})'.format(searchTerm)
attrs = ['dn']
base_dn = self.domainBase
try:
rawResults = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving results"
print "[!] {}".format(e)
sys.exit(1)
return self.get_search_results(rawResults)
def doCustomSearch(self, base, objectFilter, attrs):
try:
rawResults = self.do_ldap_query(base, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
"print [!] Error doing search"
"print [!] {}".format(e)
sys.exit(1)
return self.get_search_results(rawResults)
def getAllComputers(self, attrs=''):
if not attrs:
attrs = ['cn', 'dNSHostName', 'operatingSystem', 'operatingSystemVersion', 'operatingSystemServicePack']
objectFilter = '(objectClass=Computer)'
base_dn = self.domainBase
try:
rawComputers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except LDAPError, e:
print "[!] Error retrieving computers"
print "[!] {}".format(e)
sys.exit(1)
return (self.get_search_results(rawComputers), attrs)
def ldap_search(self, filter, attributes, incremental, incremental_filter):
"""
Query the configured LDAP server with the provided search filter and
attribute list.
"""
for uri in self.conf_LDAP_SYNC_BIND_URI:
#Read record of this uri
if (self.working_uri == uri):
adldap_sync = self.working_adldap_sync
created = False
else:
adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri)
if ((adldap_sync.syncs_to_full > 0) and incremental):
filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT))
logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use)
else:
filter_to_use = filter
ldap.set_option(ldap.OPT_REFERRALS, 0)
#ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l = PagedLDAPObject(uri)
l.protocol_version = 3
if (uri.startswith('ldaps:')):
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
else:
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l.set_option(ldap.OPT_X_TLS_DEMAND, False)
try:
l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS)
except ldap.LDAPError as e:
logger.error("Error connecting to LDAP server %s : %s" % (uri, e))
continue
results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None)
l.unbind_s()
if (self.working_uri is None):
self.working_uri = uri
self.conf_LDAP_SYNC_BIND_URI.insert(0, uri)
self.working_adldap_sync = adldap_sync
return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes
#if not connected correctly, raise error
raise
def search_ext(self,base,scope,filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1,sizelimit=0):
"""
search(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]]) -> int
search_s(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]])
search_st(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,timeout=-1]]]])
search_ext(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]])
search_ext_s(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]])
Perform an LDAP search operation, with base as the DN of
the entry at which to start the search, scope being one of
SCOPE_BASE (to search the object itself), SCOPE_ONELEVEL
(to search the object's immediate children), or SCOPE_SUBTREE
(to search the object and all its descendants).
filter is a string representation of the filter to
apply in the search (see RFC 4515).
Each result tuple is of the form (dn,entry), where dn is a
string containing the DN (distinguished name) of the entry, and
entry is a dictionary containing the attributes.
Attributes types are used as string dictionary keys and attribute
values are stored in a list as dictionary value.
The DN in dn is extracted using the underlying ldap_get_dn(),
which may raise an exception of the DN is malformed.
If attrsonly is non-zero, the values of attrs will be
meaningless (they are not transmitted in the result).
The retrieved attributes can be limited with the attrlist
parameter. If attrlist is None, all the attributes of each
entry are returned.
serverctrls=None
clientctrls=None
The synchronous form with timeout, search_st() or search_ext_s(),
will block for at most timeout seconds (or indefinitely if
timeout is negative). A TIMEOUT exception is raised if no result is
received within the time.
The amount of search results retrieved can be limited with the
sizelimit parameter if non-zero.
"""
base, filterstr = self._unbytesify_values(base, filterstr)
if attrlist is not None:
attrlist = tuple(self._unbytesify_values(*attrlist))
return self._ldap_call(
self._l.search_ext,
base,scope,filterstr,
attrlist,attrsonly,
RequestControlTuples(serverctrls),
RequestControlTuples(clientctrls),
timeout,sizelimit,
)
def execute(self):
args = self.args
# we can connect in 2 ways. By hostname/ip (and portnumber)
# or by ldap-uri
if "url" in args and ldapurl.isLDAPUrl(args["url"]):
conn = ldap.initialize(args["url"])
else:
ip, port = self.get_address()
conn = ldap.initialize("ldap://%s:%s" % (ip, port))
username = args.get("username", "")
password = args.get("password", "")
conn.simple_bind(username, password)
try:
self._set_version(args, conn)
except ValueError:
return Event.DOWN, "unsupported protocol version"
base = args.get("base", "dc=example,dc=org")
if base == "cn=monitor":
my_res = conn.search_st(base, ldap.SCOPE_BASE,
timeout=self.timeout)
versionstr = str(my_res[0][-1]['description'][0])
self.version = versionstr
return Event.UP, versionstr
scope = args.get("scope", "SUBTREE").upper()
if scope == "BASE":
scope = ldap.SCOPE_BASE
elif scope == "ONELEVEL":
scope = ldap.SCOPE_ONELEVEL
else:
scope = ldap.SCOPE_SUBTREE
filtr = args.get("filter", "objectClass=*")
try:
conn.search_ext_s(base, scope, filterstr=filtr,
timeout=self.timeout)
# pylint: disable=W0703
except Exception as err:
return (Event.DOWN,
"Failed ldapSearch on %s for %s: %s" % (
self.get_address(), filtr, str(err)))
conn.unbind()
return Event.UP, "Ok"