def exact(self):
try:
results = self.connection.search_s(
self.dn, ldap.SCOPE_BASE, attrlist=[self.name])
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(
msg="Cannot search for attribute %s" % self.name,
details=str(e))
current = results[0][1].get(self.name, [])
modlist = []
if frozenset(self.values) != frozenset(current):
if len(current) == 0:
modlist = [(ldap.MOD_ADD, self.name, self.values)]
elif len(self.values) == 0:
modlist = [(ldap.MOD_DELETE, self.name, None)]
else:
modlist = [(ldap.MOD_REPLACE, self.name, self.values)]
return modlist
python类MOD_REPLACE的实例源码
def usr2ldap_renew(cls, usr):
"""Return a list of (type, attribute) pairs for given user.
This list is used in LDAP modify queries for renewing."""
tmp = [
(ldap.MOD_REPLACE, 'newbie', usr.newbie and 'TRUE' or 'FALSE'),
(ldap.MOD_REPLACE, 'cn', usr.cn),
(ldap.MOD_REPLACE, 'altmail', usr.altmail),
(ldap.MOD_REPLACE, 'updatedby', usr.updatedby),
(ldap.MOD_REPLACE, 'updated', usr.updated),
]
if usr.id is not None:
tmp.append((ldap.MOD_REPLACE, 'id', str(usr.id)))
if usr.course:
tmp.append((ldap.MOD_REPLACE, 'course', usr.course))
if usr.year is not None:
tmp.append((ldap.MOD_REPLACE, 'year', usr.year))
if usr.yearsPaid is not None:
tmp.append((ldap.MOD_REPLACE, 'yearsPaid', str(usr.yearsPaid)))
if usr.birthday:
tmp.append((ldap.MOD_REPLACE, 'birthday', usr.birthday))
return tmp
def usr2ldap_update(cls, usr):
"""Return a list of (type, attribute) pairs for given user.
This list is used in LDAP modify queries for updating."""
tmp = [(ldap.MOD_REPLACE, 'newbie', usr.newbie and 'TRUE' or
'FALSE'), (ldap.MOD_REPLACE, 'cn', usr.cn),
(ldap.MOD_REPLACE, 'altmail',
usr.altmail), (ldap.MOD_REPLACE, 'updatedby', usr.updatedby),
(ldap.MOD_REPLACE, 'updated', usr.updated)]
if usr.id is not None:
tmp.append((ldap.MOD_REPLACE, 'id', str(usr.id)))
if usr.course:
tmp.append((ldap.MOD_REPLACE, 'course', usr.course))
if usr.year is not None:
tmp.append((ldap.MOD_REPLACE, 'year', usr.year))
if usr.yearsPaid is not None:
tmp.append((ldap.MOD_REPLACE, 'yearsPaid', str(usr.yearsPaid)))
if usr.birthday:
tmp.append((ldap.MOD_REPLACE, 'birthday', usr.birthday))
return tmp
def set_password(self, username, hashes):
"""
Administratively set the user's password using the given hashes.
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(self.bind_dn, self.bind_pw)
mod_nt = (ldap.MOD_REPLACE, 'sambaNTPassword', hashes['sambaNTPassword'])
mod_ssha = (ldap.MOD_REPLACE, 'userPassword', hashes['userPassword'])
mod_list = [mod_nt, mod_ssha]
ldap_cxn.modify_s(dn, mod_list)
except ldap.INVALID_CREDENTIALS:
self.bus.log('Invalid credentials for admin user: {0}'.format(self.bind_dn), 40)
raise
except ldap.INSUFFICIENT_ACCESS:
self.bus.log('Insufficient access for admin user: {0}'.format(self.bind_dn), 40)
raise
except ldap.INVALID_DN_SYNTAX:
self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
raise
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def change_password(self, username, oldpassword, hashes):
"""
Change the user's password using their own credentials.
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(dn, oldpassword)
# don't use LDAPObject.passwd_s() here to make use of
# ldap's atomic operations. IOW, don't change one password
# but not the other.
mod_nt = (ldap.MOD_REPLACE, 'sambaNTPassword', hashes['sambaNTPassword'])
mod_ssha = (ldap.MOD_REPLACE, 'userPassword', hashes['userPassword'])
mod_list = [mod_nt, mod_ssha]
ldap_cxn.modify_s(dn, mod_list)
except ldap.INVALID_CREDENTIALS:
raise
except ldap.INVALID_DN_SYNTAX:
self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
raise
except ldap.LDAPError as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
def changeshadowexpire(self, username, shexp):
if (not self.userexistsbyuid(username)):
print("User %s does not exist!", username)
return
dn = "uid="+username+",ou=People,"+self.dc
ldif = [( ldap.MOD_REPLACE, 'shadowExpire', shexp )]
try:
self.conn.modify_s(dn, ldif)
except ldap.LDAPError as e:
print("Error: Can\'t change %s shadowExpire: %s" % (username, e.message['desc']))
def verify_token(self, uid, query_args):
code = query_args.get('code', None)
state = query_args.get('state', None)
user = self.ldaptools.getuser(uid)
if code and state:
state_key = self.config["statekey"]
if state_key == state:
r = self.get_reddit_client(self.config["redirect_base"] + url_for('reddit_loop'))
access_info = r.get_access_information(code)
auth_reddit = r.get_me()
if 'redditAccount' in user.objectClass:
if hasattr(user, 'redditName') and hasattr(user, 'redditToken'):
from ldap import MOD_REPLACE
self.ldaptools.updateattrs(uid, MOD_REPLACE, {
'redditName': auth_reddit.name,
'redditToken': access_info['access_token']
})
else:
# Something went horribly wrong.
return False
else:
from ldap import MOD_ADD
self.ldaptools.updateattrs(uid, MOD_ADD, {
'objectClass': 'redditAccount',
'redditName': auth_reddit.name,
'redditToken': access_info['access_token']
})
return True
return False
def recovery_update_account():
try:
email = request.form["email"]
result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"]))
assert(result)
result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email)
assert(result)
flash("Account updated", "success")
except Exception:
flash("Update failed", "danger")
app.logger.info('User account {0} infos changed'.format(current_user.get_id()))
return redirect("/account")
def update_account():
email = request.form["email"]
oldpassword = request.form["oldpassword"]
api_id = request.form["api_id"]
api_key = request.form["api_key"]
update_needed = False
if api_id != current_user.keyID[0] or api_key != current_user.vCode[0]:
update_needed = True
if not ldaptools.check_credentials(current_user.get_id(), oldpassword):
flash("You must confirm your old password to update your account.", "danger")
return redirect("/account")
try:
if all(x in request.form for x in ["password", "password_confirm", "oldpassword"]):
if request.form["password"] != request.form["password_confirm"]:
flash("Password confirmation mismatch.", "danger")
return redirect("/account")
result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"]))
assert(result)
result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email)
assert(result)
if "api_id" in request.form:
result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "keyID", api_id)
assert(result)
result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "vCode", api_key)
assert(result)
flash("Account updated", "success")
except Exception:
flash("Update failed", "danger")
if update_needed is True:
update_characters([current_user.get_id()])
app.logger.info('User account {0} infos changed'.format(current_user.get_id()))
return redirect("/account")
def ldap_update_pass(self,uid=None,oldpass=None,newpass=None):
modify_entry = [(ldap.MOD_REPLACE,'userpassword',newpass)]
obj = self.ldapconn
target_cn = self.ldap_search_dn(uid)
try:
obj.simple_bind_s(target_cn,oldpass)
obj.passwd_s(target_cn,oldpass,newpass)
return True
except ldap.LDAPError,e:
return False
def update_soa(self):
mlist = [(ldap.MOD_REPLACE, 'sOARecord', self._soa())]
self.lobj.modify_s(self.dn, mlist)
def modify_address(self, name, address):
names = self.ldap_tuple[1]['associatedDomain']
if not names:
raise exception.NotFound()
if len(names) == 1:
self.lobj.modify_s(self.dn, [(ldap.MOD_REPLACE, 'aRecord',
[utils.utf8(address)])])
else:
self.remove_name(name)
self.parent.add_entry(name, address)
def set_passwd(self, usr):
"""Set password for given user from the plaintext password
in usr.passwd."""
usr.userPassword = self.userPassword(usr.passwd)
self.wrapper(self.ldap.modify_s,
self.uid2dn(usr.uid), ((ldap.MOD_REPLACE, 'userPassword',
usr.userPassword), ))
def set_shell(self, usr):
"""Set shell for given user."""
self.wrapper(self.ldap.modify_s,
self.uid2dn(usr.uid), ((ldap.MOD_REPLACE, 'loginShell',
usr.loginShell), ))
def usr2ldap_rename(cls, usr):
"""Return a list of (type, attribute) pairs for given user.
This list is used in LDAP modify queries for renaming."""
return ((ldap.MOD_REPLACE, 'homeDirectory', usr.homeDirectory),
(ldap.MOD_REPLACE, 'updatedby',
usr.updatedby), (ldap.MOD_REPLACE, 'updated', usr.updated))