def __call__(self, userid, base="", filter_pattern="", scope=SCOPE_SUBTREE,
tls=False, attr=None, attrsonly=False, **kwargs):
if filter_pattern:
_filter = filter_pattern % userid
else:
_filter = self.filter_pattern % userid
_base = base or self.base
_scope = scope or self.scope
_attr = attr or self.attr
_attrsonly = attrsonly or self.attrsonly
arg = [_base, _scope, _filter, _attr, _attrsonly]
res = self.ld.search_s(*arg)
# should only be one entry and the information per entry is
# the tuple (dn, ava)
return res[0][1]
python类SCOPE_SUBTREE的实例源码
def getusercredbyuid(self, username):
baseDN = "ou=People,"+self.dc
searchScope = ldap.SCOPE_SUBTREE
searchFilter = "uid="+username
searchAttrList = ["givenName","sn"]
try:
# Result is a list of touple (one for each match) of dn and attrs.
# Attrs is a dict, each value is a list of string.
result = self.conn.search_s(baseDN, searchScope, searchFilter, searchAttrList)
if ( result==[] ):
print("User %s does not exist!", username)
return
result = [value[0] for value in result[0][1].values()]
# Returns 2-element list, name and surname
return result
except ldap.LDAPError as e:
print(e)
return
def ldap_ping(request):
"""Verify if the LDAP server is ready."""
settings = request.registry.settings
bind_dn = settings.get('ldap.bind_dn')
bind_password = settings.get('ldap.bind_password')
base_dn = settings['ldap.base_dn']
cm = request.registry.ldap_cm
try:
with cm.connection(bind_dn, bind_password) as conn:
# Perform a dumb query
filters = settings['ldap.filters'].format(mail="demo")
conn.search_s(base_dn, SCOPE_SUBTREE, filters)
ldap = True
except Exception:
logger.exception("Heartbeat Failure")
ldap = False
return ldap
def get_custom_ldap_group_members(ldap_settings, group_name):
logger = utils.get_logger()
ldap_server = ldap_settings["server"]
ldap_base = ldap_settings["base"]
get_uid = lambda x: x[1]["uid"][0]
members = []
try:
conn = LDAP.initialize(ldap_server)
g_ldap_filter = ldap_settings[group_name]
logger.debug("Searching members for %s: %s" % (group_name,
g_ldap_filter))
items = conn.search_s(ldap_base, LDAP.SCOPE_SUBTREE,
attrlist=['uid'],
filterstr=g_ldap_filter)
members = map(get_uid, items)
except Exception, e:
logger.error("Error getting custom group %s from LDAP: %s" % (group_name, e))
return members
def get_ldap_group_members(ldap_settings, group_name):
# base:dc=example,dc=com
# filter:(&(objectClass=posixGroup)(cn={group_name}))
logger = utils.get_logger()
ldap_server = ldap_settings["server"]
ldap_base = ldap_settings["groups_base"]
ldap_filter = "(&%s(%s={group_name}))" % (ldap_settings["groups_filter"], ldap_settings["groups_id"])
get_uid = lambda x: x.split(",")[0].split("=")[1]
try:
ad_filter = ldap_filter.replace('{group_name}', group_name)
conn = LDAP.initialize(ldap_server)
logger.debug("Searching members for %s: %s - %s - %s" % (group_name,
ldap_server,
ldap_base,
ad_filter))
res = conn.search_s(ldap_base, LDAP.SCOPE_SUBTREE, ad_filter)
except Exception, e:
logger.error("Error getting group from LDAP: %s" % e)
return map(get_uid, res[0][1]['uniqueMember'])
def get_ldap_groups(ldap_settings):
'''Returns the a list of found LDAP groups filtered with the groups list in
the settings
'''
# filter:(objectClass=posixGroup)
# base:ou=Group,dc=example,dc=com
logger = utils.get_logger()
ldap_server = ldap_settings["server"]
ldap_base = ldap_settings["groups_base"]
ldap_filter = ldap_settings["groups_filter"]
ldap_groups = ldap_settings["groups"]
get_uid = lambda x: x[1]["cn"][0]
try:
conn = LDAP.initialize(ldap_server)
logger.debug("Searching groups: %s - %s - %s" % (ldap_server,
ldap_base,
ldap_filter))
res = conn.search_s(ldap_base, LDAP.SCOPE_SUBTREE, ldap_filter)
return filter((lambda x: x in ldap_groups), map(get_uid, res))
except Exception, e:
logger.error("Error getting groups from LDAP: %s" % e)
def authentication_ldap(self,username,password,returnObject=True):
cfg=r.table('config').get(1).run(db.conn)['auth']
try:
conn = ldap.initialize(cfg['ldap']['ldap_server'])
id_conn = conn.search(cfg['ldap']['bind_dn'],ldap.SCOPE_SUBTREE,"uid=%s" % username)
tmp,info=conn.result(id_conn, 0)
user_dn=info[0][0]
if conn.simple_bind_s(who=user_dn,cred=password):
'''
config/ldapauth.py has the function you can change to adapt to your ldap
'''
au=myLdapAuth()
newUser=au.newUser(username,info[0])
return User(newUser) if returnObject else newUser
else:
return False
except Exception as e:
log.error("LDAP ERROR:",e)
return False
def updateuser(self, uid, modattrs):
l = ldap.initialize(self.config["server"])
l.simple_bind(self.config["admin"], self.config["password"])
dn = "uid=%s,%s" % (uid, self.config["memberdn"])
ldap_filter = "uid="+uid
result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
if result_id:
type, data = l.result(result_id, 0)
if data:
dn, attrs = data[0]
oldattrs = attrs
newattrs = attrs.copy()
newattrs.update(modattrs)
# now change it
newattrs.update(oldattrs)
ldif = modlist.modifyModlist(oldattrs, newattrs)
print ldif
l.modify_s(dn, ldif)
l.unbind_s()
return True
else:
return False
def find_unique_entry(self,base,scope=ldap.SCOPE_SUBTREE,filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1):
"""
Returns a unique entry, raises exception if not unique
"""
r = self.search_ext_s(
base,
scope,
filterstr,
attrlist=attrlist or ['*'],
attrsonly=attrsonly,
serverctrls=serverctrls,
clientctrls=clientctrls,
timeout=timeout,
sizelimit=2,
)
if len(r)!=1:
raise NO_UNIQUE_ENTRY('No or non-unique search result for %s' % (repr(filterstr)))
return r[0]
def search(self, base, scope=ldap.SCOPE_SUBTREE, ldap_filter='(objectClass=*)', attributes=None,
serverctrls=None):
"""
Perform a low level LDAP search (synchronous) using the given arguments.
:param base: Base DN of the search
:param scope: Scope of the search, default is SCOPE_SUBTREE
:param ldap_filter: ldap filter, default is '(objectClass=*)'
:param attributes: An array of attributes to return, default is ['*']
:param serverctrls: An array server extended controls
:return: a list of tuples (dn, attributes)
"""
if serverctrls is None:
logger.debug("Performing LDAP search: base: {}, scope: {}, filter: {}".format(base, scope, ldap_filter))
return self._server.search_s(base, scope, ldap_filter, attributes)
else:
logger.debug("Performing ext LDAP search: base: {}, scope: {}, filter: {}, serverctrls={}".
format(base,
scope,
ldap_filter,
serverctrls))
return self._server.search_ext_s(base, scope, ldap_filter, attrlist=attributes,
serverctrls=serverctrls)
def ldap_search_dn(self,uid=None):
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
searchFilter = "cn=" + uid
try:
ldap_result_id = obj.search(self.base_dn, searchScope, searchFilter, retrieveAttributes)
result_type, result_data = obj.result(ldap_result_id, 0)
#??????
#('cn=django,ou=users,dc=gccmx,dc=cn',
# { 'objectClass': ['inetOrgPerson', 'top'],
# 'userPassword': ['{MD5}lueSGJZetyySpUndWjMBEg=='],
# 'cn': ['django'], 'sn': ['django'] } )
#
if result_type == ldap.RES_SEARCH_ENTRY:
#dn = result[0][0]
return result_data[0][0]
else:
return None
except ldap.LDAPError, e:
print e
#??????????????
def ldap_get_user(self,uid=None):
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
searchFilter = "cn=" + uid
try:
ldap_result_id = obj.search(self.base_dn, searchScope, searchFilter, retrieveAttributes)
result_type, result_data = obj.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
username = result_data[0][1]['cn'][0]
email = result_data[0][1]['mail'][0]
nick = result_data[0][1]['sn'][0]
result = {'username':username,'email':email,'nick':nick}
return result
else:
return None
except ldap.LDAPError, e:
print e
#????????????????????LDAP???boolean?
def get_student_byid(self, usr, override=0):
"""Populate RBUser object with data from user with given id in
student database.
By default will only populate RBUser attributes that have no
value (None) unless override is enabled.
Note that all students *should* be in the database, but only
raise a RBWarningError if user does not exist."""
res = self.ldap_dcu.search_s(rbconfig.ldap_dcu_students_tree,
ldap.SCOPE_SUBTREE,
'employeeNumber=%s' % usr.id)
if res:
self.set_user_dcu(usr, res[0], override)
self.set_user_dcu_student(usr, res[0], override)
else:
raise RBWarningError(
"Student id '%s' does not exist in database" % usr.id)
def get_alumni_byid(self, usr, override=0):
"""Populate RBUser object with data from user with given id in
alumni database.
By default will only populate RBUser attributes that have no
value (None) unless override is enabled.
Not all alumni will be in the database, so only raise a
RBWarningError if user does not exist."""
res = self.ldap_dcu.search_s(rbconfig.ldap_dcu_alumni_tree,
ldap.SCOPE_SUBTREE, 'cn=%s' % usr.id)
if res:
self.set_user_dcu(usr, res[0], override)
self.set_user_dcu_alumni(usr, res[0], override)
else:
raise RBWarningError(
"Alumni id '%s' does not exist in database" % usr.id)
def get_staff_byid(self, usr, override=0):
"""Populate RBUser object with data from user with given id in
staff database.
By default will only populate RBUser attributes that have no
value (None) unless override is enabled.
Not all staff are in the database, so only raise a
RBWarningError if user does not exist."""
# Staff ID is not consistently set. It will either be in the cn
# or in the gecos, so try both.
#
res = self.ldap_dcu.search_s(
rbconfig.ldap_dcu_staff_tree, ldap.SCOPE_SUBTREE,
'(|(cn=%s)(gecos=*,*%s))' % (usr.id, usr.id))
if res:
self.set_user_dcu(usr, res[0], override)
self.set_user_dcu_staff(usr, res[0], override)
else:
raise RBWarningError(
"Staff id '%s' does not exist in database" % usr.id)
def test_logger(mocker):
from ldap2pg.ldap import LDAPLogger
from ldap import SCOPE_SUBTREE
ldap = LDAPLogger(mocker.Mock(name='toto', pouet='toto'))
assert 'toto' == ldap.pouet
ldap.search_s('base', scope=SCOPE_SUBTREE, filter='', attributes='cn')
def __ldap_load_members2(self, l, moderator=False):
attr = ['mail']
if not moderator and self.ldapfullname:
attr.append(self.ldapfullname)
if self.ldapgroupattr or moderator:
# group attribute or moderator flag has been set. Let's get the uids.
if moderator:
assert self.ldapmodgroupdn
groupdn = self.ldapmodgroupdn
else:
groupdn = self.ldapgroupdn
members = l.search_s(groupdn, ldap.SCOPE_SUBTREE,
self.ldapsearch, [self.ldapgroupattr])
for (dn,attrs) in members:
if self.ldapgroupattr in attrs:
memberids = attrs[self.ldapgroupattr]
if DEBUG:
syslog('debug','regular groupdns = %s' % groupdns)
for memberid in memberids:
try:
res2 = l.search_s(self.ldapbasedn,
ldap.SCOPE_SUBTREE,
'(&(objectClass=*)('+self.ldapmemberuid+'='+memberid+'))',
attr)
self.__loadmembers(res2, moderator)
except ldap.NO_SUCH_OBJECT:
syslog('warn',"can't process %s: no such object (accountDisabled?)" % memberid)
else:
members = l.search_s(self.ldapbasedn,
ldap.SCOPE_SUBTREE,
self.ldapsearch,
attr)
self.__loadmembers(members)
def paged_search_ext_s(self, base, scope, filterstr='(objectClass=*)', attrlist=None, attrsonly=0,
serverctrls=None, clientctrls=None, timeout=-1, sizelimit=0):
"""
Behaves exactly like LDAPObject.search_ext_s() but internally uses the
simple paged results control to retrieve search results in chunks.
"""
req_ctrl = SimplePagedResultsControl(True, size=self.conf_LDAP_SYNC_BIND_PAGESIZE, cookie='')
# Send first search request
msgid = self.search_ext(base, ldap.SCOPE_SUBTREE, filterstr, attrlist=attrlist,
serverctrls=(serverctrls or []) + [req_ctrl])
results = []
while True:
rtype, rdata, rmsgid, rctrls = self.result3(msgid)
results.extend(rdata)
# Extract the simple paged results response control
pctrls = [c for c in rctrls if c.controlType == SimplePagedResultsControl.controlType]
if pctrls:
if pctrls[0].cookie:
# Copy cookie from response control to request control
req_ctrl.cookie = pctrls[0].cookie
msgid = self.search_ext(base, ldap.SCOPE_SUBTREE, filterstr, attrlist=attrlist,
serverctrls=(serverctrls or []) + [req_ctrl])
else:
break
return results
def userexistsbyuid(self, username):
baseDN = "ou=People,"+self.dc
searchScope = ldap.SCOPE_SUBTREE
searchFilter = "uid="+username
try:
result = self.conn.search_s(baseDN, searchScope, searchFilter, None)
if ( result==[] ):
return False
else:
return True
except ldap.LDAPError as e:
print(e)
return False
def getuser(self, id):
if not isinstance(id, basestring):
id = id[0]
l = ldap.initialize(self.config["server"])
l.simple_bind(self.config["admin"], self.config["password"])
ldap_filter = "uid="+id
result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
if result_id:
type, data = l.result(result_id, 0)
if data:
dn, attrs = data[0]
l.unbind_s()
return self.User(attrs, self.authconfig["auth"]["domain"])
l.unbind_s()
return None
def getusers(self, searchfilter):
l = ldap.initialize(self.config["server"])
l.simple_bind(self.config["admin"], self.config["password"])
ldap_filter = searchfilter
result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
results = []
while 1:
result_type, result_data = l.result(result_id, 0)
if (result_data == []):
break
else:
if result_type == ldap.RES_SEARCH_ENTRY:
results.append(result_data[0][1])
return map(lambda x:self.User(x, self.authconfig["auth"]["domain"]), results)
def ldap_get_members():
con = ldap.get_con()
res = con.search_s(
"ou=Users,dc=csh,dc=rit,dc=edu",
pyldap.SCOPE_SUBTREE,
"(memberof=cn=member,ou=groups,dc=csh,dc=rit,dc=edu)",
["entryUUID", "displayName"])
members = [{
"name": m[1]['displayName'][0].decode('utf-8'),
"uuid": m[1]['entryUUID'][0].decode('utf-8')
} for m in res]
return members
def get_ldap_lists(l, search_strings, parent_list=None):
results = set()
known_ldap_resp_ctrls = {
SimplePagedResultsControl.controlType: SimplePagedResultsControl,
}
req_ctrl = SimplePagedResultsControl(True, size=ldap_pagination_size, cookie='')
if parent_list:
filterstr = search_strings['get_all_sub_lists_filter'] % escape_filter_chars(parent_list)
else:
filterstr = search_strings['get_all_lists_filter']
while True:
msgid = l.search_ext(search_strings['list_search_string'],
ldap.SCOPE_SUBTREE,
serverctrls=[req_ctrl],
attrlist=(search_strings['list_cn_field'], search_strings['list_name_field']),
filterstr=filterstr)
rtype, rdata, rmsgid, serverctrls = l.result3(msgid, resp_ctrl_classes=known_ldap_resp_ctrls)
results |= {(data[search_strings['list_cn_field']][0], data[search_strings['list_name_field']][0]) for (dn, data) in rdata}
pctrls = [c for c in serverctrls
if c.controlType == SimplePagedResultsControl.controlType]
cookie = pctrls[0].cookie
if not cookie:
break
req_ctrl.cookie = cookie
return results
def get_ldap_list_membership(l, search_strings, list_name):
results = set()
known_ldap_resp_ctrls = {
SimplePagedResultsControl.controlType: SimplePagedResultsControl,
}
req_ctrl = SimplePagedResultsControl(True, size=ldap_pagination_size, cookie='')
while True:
msgid = l.search_ext(search_strings['user_search_string'],
ldap.SCOPE_SUBTREE,
serverctrls=[req_ctrl],
attrlist=[search_strings['user_account_name_field']],
filterstr=search_strings['user_membership_filter'] % escape_filter_chars(list_name)
)
rtype, rdata, rmsgid, serverctrls = l.result3(msgid, resp_ctrl_classes=known_ldap_resp_ctrls)
results |= {data[1][search_strings['user_account_name_field']][0] for data in rdata}
pctrls = [c for c in serverctrls
if c.controlType == SimplePagedResultsControl.controlType]
cookie = pctrls[0].cookie
if not cookie:
break
req_ctrl.cookie = cookie
return results
def search_dn(self):
"""Searches for the user's Distinguished Name in the LDAP directory.
:returns: A tuple of (dn, canonical_username)
"""
uid_attr = escape_filter_chars(_config.get('ldap', 'uid_attr'))
encoding = _config.get('ldap', 'encoding')
manager = _config.get('ldap', 'manager').encode(encoding)
manager_password = _config.get(
'ldap', 'manager_password', raw=True).encode(encoding)
if manager:
_logger.debug("Attempting authenticated bind as manager to %s",
manager)
self.ldap.simple_bind_s(manager, manager_password)
filter_ = "(%s=%s)" % (uid_attr, escape_filter_chars(
self.username.encode(encoding)))
result = self.ldap.search_s(_config.get('ldap', 'basedn'),
ldap.SCOPE_SUBTREE, filter_)
if not result or not result[0] or not result[0][0]:
raise UserNotFound(filter_)
user_dn, attrs = result[0]
if uid_attr in attrs:
uid = attrs[uid_attr][0].decode(encoding)
else:
uid = self.username
return user_dn.decode(encoding), uid
def __init__(self, uri, base, filter_pattern, scope=SCOPE_SUBTREE,
tls=False, user="", passwd="", attr=None, attrsonly=False):
UserInfo.__init__(self)
self.ldapuri = uri
self.base = base
self.filter_pattern = filter_pattern
self.scope = scope
self.tls = tls
self.attr = attr
self.attrsonly = attrsonly
self.ld = ldap.initialize(uri)
self.ld.protocol_version = ldap.VERSION3
self.ld.simple_bind_s(user, passwd)
def get_user_records(username):
with initialize_connection() as connection:
connection.simple_bind_s(config.LDAP_BIND_DN, config.LDAP_BIND_PASSWORD)
query = config.LDAP_USER_OBJECT_FILTER % {'user': ldap_filter.escape_filter_chars(username)}
records = connection.search_s(config.LDAP_USER_BASE_DN, ldap.SCOPE_SUBTREE, query)
return records
def check_group_membership(username, group=None):
with initialize_connection() as connection:
connection.simple_bind_s(config.LDAP_BIND_DN, config.LDAP_BIND_PASSWORD)
if group is None:
query = config.LDAP_GROUP_ALL
else:
query = config.LDAP_GROUP_OBJECT_FILTER % {'group': ldap_filter.escape_filter_chars(group)}
results = connection.search_s(config.LDAP_GROUP_BASE_DN, ldap.SCOPE_SUBTREE, query)
members = itertools.chain.from_iterable([result[1]['memberUid']for result in results])
encoding = config.LDAP_ENCODING
members = [member.decode(encoding) for member in members]
is_member = username in members
return is_member
def ldap_search(module, conn, dn, search_filter, ldap_attrs):
try:
search = conn.search_s(dn, ldap.SCOPE_SUBTREE, search_filter, ldap_attrs)
except ldap.LDAPError as e:
fail_msg = "LDAP Error Searching: {}".format(ldap_errors(e))
module.fail_json(msg=fail_msg)
return search
def _search(self, base, fltr, attrs=None, scope=ldap.SCOPE_SUBTREE):
results = self._conn.search_s(base, scope, fltr, attrs)
return results