def deleteuser(self, uid):
l = ldap.initialize(self.config["server"])
l.simple_bind(self.config["admin"], self.config["password"])
dn = "uid=%s,%s" % (uid, self.config["memberdn"])
l.delete_s(dn)
l.unbind_s()
return True
python类initialize()的实例源码
def modgroup(self, uid, change, group):
l = ldap.initialize(self.config["server"])
l.simple_bind(self.config["admin"], self.config["password"])
dn = "uid=%s,%s" % (uid, self.config["memberdn"])
l.modify_s(dn, [(change, 'authGroup', group)])
l.unbind_s()
return True
def getuser(self, id):
if not isinstance(id, basestring):
id = id[0]
l = ldap.initialize(self.config["server"])
l.simple_bind(self.config["admin"], self.config["password"])
ldap_filter = "uid="+id
result_id = l.search(self.config["memberdn"], ldap.SCOPE_SUBTREE, ldap_filter, None)
if result_id:
type, data = l.result(result_id, 0)
if data:
dn, attrs = data[0]
l.unbind_s()
return self.User(attrs, self.authconfig["auth"]["domain"])
l.unbind_s()
return None
def ldap_connect(self,ip,username,password,port):
creak=0
try:
ldappath='ldap://'+ip+':'+port+'/'
l = ldap.initialize(ldappath)
re=l.simple_bind(username,password)
if re==1:
creak=1
except Exception,e:
if e[0]['desc']=="Can't contact LDAP server":
creak=2
pass
return creak
def initiate_ldap():
"""
contact the LDAP server to return a LDAP object
"""
ldap_schemes = ['ldap://', 'ldaps://']
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0)
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir'))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile'))
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile'))
ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) # TRY, NEVER, DEMAND
ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
for scheme in ldap_schemes:
ldap_url = scheme + server_url
ldap_obj = ldap.initialize(ldap_url)
try:
ldap_obj.start_tls_s()
except ldap.OPERATIONS_ERROR as e:
e_msg = e[0]['info']
if e_msg == 'TLS already started':
pass
else:
raise
except ldap.SERVER_DOWN:
if scheme is not ldap_schemes[-1]:
continue
else:
raise
if login_dn != 'DEFAULT': # Use anonymous bind if login_dn is set as DEFAULT
ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE)
else:
try:
ldap_obj.whoami_s()
except ldap.UNWILLING_TO_PERFORM:
print 'Anonymous binding is disabled by server'
raise SystemExit
return ldap_obj
break
def __init__(
self,uri,
trace_level=0,trace_file=None,trace_stack_limit=5,bytes_mode=None
):
self._trace_level = trace_level
self._trace_file = trace_file or sys.stdout
self._trace_stack_limit = trace_stack_limit
self._uri = uri
self._ldap_object_lock = self._ldap_lock('opcall')
self._l = ldap.functions._ldap_function_call(ldap._ldap_module_lock,_ldap.initialize,uri)
self.timeout = -1
self.protocol_version = ldap.VERSION3
# Bytes mode
# ----------
# By default, raise a TypeError when receiving invalid args
self.bytes_mode_hardfail = True
if bytes_mode is None and PY2:
warnings.warn(
"Under Python 2, python-ldap uses bytes by default. "
"This will be removed in Python 3 (no bytes for DN/RDN/field names). "
"Please call initialize(..., bytes_mode=False) explicitly.",
BytesWarning,
stacklevel=2,
)
bytes_mode = True
# Disable hard failure when running in backwards compatibility mode.
self.bytes_mode_hardfail = False
elif bytes_mode and not PY2:
raise ValueError("bytes_mode is *not* supported under Python 3.")
# On by default on Py2, off on Py3.
self.bytes_mode = bytes_mode
def whoami_s(self,*args,**kwargs):
return self._apply_method_s(SimpleLDAPObject.whoami_s,*args,**kwargs)
# The class called LDAPObject will be used as default for
# ldap.open() and ldap.initialize()
def ldap_auth(self, username, password):
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.cert_path)
connection = ldap.initialize(self.ldap_url)
connection.set_option(ldap.OPT_REFERRALS, 0)
try:
if password:
connection.simple_bind_s(username + self.user_suffix, password)
else:
return False
except ldap.INVALID_CREDENTIALS:
return False
except ldap.SERVER_DOWN:
return None
return True
def ldap_connection(username, password):
conn = ldap.initialize(settings.LDAP_PROVIDER_URL)
try:
conn.simple_bind_s('uid={0},{1}'.format(username, settings.LDAP_BASE_DC), password)
return 1
except:
return 0
def __init__(self, backend, mode=PLAIN,
cert=None,
key=None,
cacertdir='/etc/ssl/certs',
):
self.backend = backend
self._server = None
self._schema = {}
self._cert = cert
self._key = key
logger.debug("LDAP _session created, id: {}".format(id(self)))
# Switch to LDAPS mode if ldaps is backend start with 'ldaps'
if 'ldaps' == backend[:5].lower():
mode = self.LDAPS
# Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed
if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None:
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
if cacertdir is None:
warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
# Set client certificate if both cert and key are provided
if cert is not None and key is not None:
if not os.path.isfile(cert):
raise LDAPSessionException("Certificate file {} does not exist".format(cert))
if not os.path.isfile(key):
raise LDAPSessionException("Certificate key file {} does not exist".format(cert))
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert)
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key)
self._server = ldap.initialize(self.backend, bytes_mode=False)
# Proceed STARTTLS
if mode == self.STARTTLS:
self._server.start_tls_s()
def __init__(self, uri, base, filter_pattern, scope=SCOPE_SUBTREE,
tls=False, user="", passwd="", attr=None, attrsonly=False):
UserInfo.__init__(self)
self.ldapuri = uri
self.base = base
self.filter_pattern = filter_pattern
self.scope = scope
self.tls = tls
self.attr = attr
self.attrsonly = attrsonly
self.ld = ldap.initialize(uri)
self.ld.protocol_version = ldap.VERSION3
self.ld.simple_bind_s(user, passwd)
def __init__(self, srv, ldapsrv, return_to,
dn_pattern, mako_template, template_lookup):
"""
:param srv: The server instance
:param ldapsrv: Which LDAP server to us
:param return_to: Where to send the user after authentication
:return:
"""
UsernamePasswordMako.__init__(self, srv, mako_template, template_lookup,
None, return_to)
self.ldap = ldap.initialize(ldapsrv)
self.ldap.protocol_version = 3
self.ldap.set_option(ldap.OPT_REFERRALS, 0)
self.dn_pattern = dn_pattern
def initialize_connection():
connection = ldap.initialize(config.LDAP_SERVER_URI)
connection.protocol_version = ldap.VERSION3
for key, value in config.LDAP_GLOBAL_OPTIONS.items():
connection.set_option(key, value)
if config.LDAP_START_TLS:
connection.start_tls_s()
yield connection
connection.unbind_s()
pyAuthenticationByLDAP.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def __init__(self, username, password):
ldap_host = "192.168.78.8"
ldap_port = "389"
ldaps_port = "636"
ldap_enable_ldaps = False
self.ldap_base_dn = "DC=example,DC=com,DC=cn" # example.com.cn
self.ldap_user = username
self.ldap_password = password
if ldap_enable_ldaps is True:
self.uri = "ldaps://" + ldap_host + ":" + ldaps_port
else:
self.uri = "ldap://" + ldap_host + ":" + ldap_port
self.is_active = False
self.user_data = None
self.conn = ldap.initialize(self.uri)
try:
self.conn.set_option(ldap.OPT_REFERRALS, 0) # this option is required in Windows Server 2012
self.conn.simple_bind_s(who=self.ldap_user, cred=self.ldap_password)
except ldap.INVALID_CREDENTIALS:
raise Exception("Invalid credentials")
except ldap.SERVER_DOWN:
raise Exception("Can't contact LDAP server")
self.is_active = True
self.user_data = self.conn.search_s(self.ldap_base_dn, ldap.SCOPE_SUBTREE,
'userPrincipalName=' + self.ldap_user)
# self.user_data = self.conn.search_s(self.ldap_base_dn, ldap.SCOPE_SUBTREE)
self.conn.unbind()
def __init__(self):
if not ldap:
raise ImportError(_('ldap not installed'))
self.lobj = ldap.initialize(CONF.ldap_dns_url)
self.lobj.simple_bind_s(CONF.ldap_dns_user,
CONF.ldap_dns_password)
def initializeConnection(self):
if not self.dc_ip:
self.dc_ip = self.getDC_IP(self.domain)
con = ldap.initialize('ldap://{}'.format(self.dc_ip))
con.set_option(ldap.OPT_REFERRALS, 0)
return con
def connect(self,
uri=rbconfig.LDAP_URI,
dn=rbconfig.LDAP_ROOT_DN,
password=None,
dcu_uri=rbconfig.LDAP_DCU_URI,
dcu_dn=rbconfig.LDAP_DCU_RBDN,
dcu_pw=None):
"""Connect to databases.
Custom URI, DN and password may be given for RedBrick LDAP.
Password if not given will be read from shared secret file set
in rbconfig.
Custom URI may be given for DCU LDAP. """
if not password:
try:
pw_file = open(rbconfig.LDAP_ROOTPW_FILE, 'r')
password = pw_file.readline().rstrip()
except IOError:
raise RBFatalError("Unable to open LDAP root password file")
pw_file.close()
if not dcu_pw:
try:
pw_file = open(rbconfig.LDAP_DCU_RBPW, 'r')
dcu_pw = pw_file.readline().rstrip()
except IOError:
raise RBFatalError("Unable to open DCU AD root password file")
pw_file.close()
# Default protocol seems to be 2, set to 3.
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
# Connect to RedBrick LDAP.
self.ldap = ldap.initialize(uri)
self.ldap.simple_bind_s(dn, password)
# Connect to DCU LDAP (anonymous bind).
self.ldap_dcu = ldap.initialize(dcu_uri)
# self.ldap_dcu.simple_bind_s('', '')
self.ldap_dcu.simple_bind_s(dcu_dn, dcu_pw)