def add_sshpubkey(self, username, sshpubkey):
"""
Add an sshPublicKey attribute to the user's dn
"""
dn = 'uid={0},{1}'.format(username, self.base_dn)
try:
with self._ldap_connection() as ldap_cxn:
ldap_cxn.simple_bind_s(self.bind_dn, self.bind_pw)
mod_list = [(ldap.MOD_ADD, 'sshPublicKey', str(sshpubkey))]
ldap_cxn.modify_s(dn, mod_list)
except (ldap.INVALID_CREDENTIALS, ldap.INSUFFICIENT_ACCESS, ldap.LDAPError) as e:
self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
level=40,
traceback=True)
raise
python类MOD_ADD的实例源码
def add_entry(self, name, address):
if self.subentry_with_name(name):
raise exception.FloatingIpDNSExists(name=name,
domain=self.qualified_domain)
entries = self.subentries_with_ip(address)
if entries:
# We already have an ldap entry for this IP, so we just
# need to add the new name.
existingdn = entries[0].dn
self.lobj.modify_s(existingdn, [(ldap.MOD_ADD,
'associatedDomain',
utils.utf8(self._qualify(name)))])
return self.subentry_with_name(name)
else:
# We need to create an entirely new entry.
newdn = 'dc=%s,%s' % (name, self.dn)
attrs = {'objectClass': ['domainrelatedobject', 'dnsdomain',
'domain', 'dcobject', 'top'],
'aRecord': [address],
'associatedDomain': [self._qualify(name)],
'dc': [name]}
self.lobj.add_s(newdn, create_modlist(attrs))
return self.subentry_with_name(name)
def exact(self):
try:
results = self.connection.search_s(
self.dn, ldap.SCOPE_BASE, attrlist=[self.name])
except ldap.LDAPError:
e = get_exception()
self.module.fail_json(
msg="Cannot search for attribute %s" % self.name,
details=str(e))
current = results[0][1].get(self.name, [])
modlist = []
if frozenset(self.values) != frozenset(current):
if len(current) == 0:
modlist = [(ldap.MOD_ADD, self.name, self.values)]
elif len(self.values) == 0:
modlist = [(ldap.MOD_DELETE, self.name, None)]
else:
modlist = [(ldap.MOD_REPLACE, self.name, self.values)]
return modlist
def group_apply(group):
originalgroup = group
group = str(group)
assert(group in app.config["groups"]["closedgroups"]+app.config["groups"]["opengroups"])
join = True
if group in app.config["groups"]["closedgroups"]:
group = group+"-pending"
join = False
if current_user.accountStatus[0]=="Ineligible":
if group not in app.config["groups"]["publicgroups"]:
flash("You cannot join that group.", "danger")
return redirect("/groups")
ldaptools.modgroup(current_user.get_id() , MOD_ADD, group)
if join:
flash("Joined %s group" % group, "success")
else:
flash("Applied for %s group" % originalgroup, "success")
return redirect("/groups")
def verify_token(self, uid, query_args):
code = query_args.get('code', None)
state = query_args.get('state', None)
user = self.ldaptools.getuser(uid)
if code and state:
state_key = self.config["statekey"]
if state_key == state:
r = self.get_reddit_client(self.config["redirect_base"] + url_for('reddit_loop'))
access_info = r.get_access_information(code)
auth_reddit = r.get_me()
if 'redditAccount' in user.objectClass:
if hasattr(user, 'redditName') and hasattr(user, 'redditToken'):
from ldap import MOD_REPLACE
self.ldaptools.updateattrs(uid, MOD_REPLACE, {
'redditName': auth_reddit.name,
'redditToken': access_info['access_token']
})
else:
# Something went horribly wrong.
return False
else:
from ldap import MOD_ADD
self.ldaptools.updateattrs(uid, MOD_ADD, {
'objectClass': 'redditAccount',
'redditName': auth_reddit.name,
'redditToken': access_info['access_token']
})
return True
return False
def groupapprove(id, group):
if ("admin" not in current_user.get_authgroups()) and ("admin-%s" % group not in current_user.get_authgroups()):
flash("You do not have the right to do that.", "danger")
return redirect("/groups/admin")
try:
id = str(id)
group = str(group)
ldaptools.modgroup(id, MOD_DELETE, group+"-pending")
ldaptools.modgroup(id, MOD_ADD, group)
flash("Membership of %s approved for %s" % (group, id), "success")
return redirect("/groups/admin")
except:
flash("Membership application not found", "danger")
return redirect("/groups/admin")
def groupmkadmin(id, group):
if ("admin" not in current_user.get_authgroups()) and ("admin-%s" % group not in current_user.get_authgroups()):
flash("You do not have the right to do that.", "danger")
return redirect("/groups/admin")
id = str(id)
group = str(group)
try:
ldaptools.modgroup(id, MOD_ADD, "admin-%s" % group)
flash("Membership of admin-%s added for %s" % (group, id), "success")
except:
flash("That user is already in that group.", "danger")
return redirect("/groups/list/"+group)
def groupmkping(id, group):
if ("admin" not in current_user.get_authgroups()) and ("admin-%s" % group not in current_user.get_authgroups()):
flash("You do not have the right to do that.", "danger")
return redirect("/groups/admin")
id = str(id)
group = str(group)
try:
ldaptools.modgroup(id, MOD_ADD, "ping-%s" % group)
flash("Membership of ping-%s added for %s" % (group, id), "success")
except:
flash("That user is already in that group.", "danger")
return redirect("/groups/list/"+group)
def add(self):
values_to_add = filter(self._is_value_absent, self.values)
if len(values_to_add) > 0:
modlist = [(ldap.MOD_ADD, self.name, values_to_add)]
else:
modlist = []
return modlist