def callIntoPAM(service, user, conv):
"""A testing hook.
"""
pam = PAM.pam()
pam.start(service)
pam.set_item(PAM.PAM_USER, user)
pam.set_item(PAM.PAM_CONV, conv)
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
pam.authenticate() # these will raise
pam.acct_mgmt()
return 1
finally:
os.setegid(gid)
os.seteuid(uid)
python类setegid()的实例源码
def callIntoPAM(service, user, conv):
"""A testing hook.
"""
pam = PAM.pam()
pam.start(service)
pam.set_item(PAM.PAM_USER, user)
pam.set_item(PAM.PAM_CONV, conv)
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
pam.authenticate() # these will raise
pam.acct_mgmt()
return 1
finally:
os.setegid(gid)
os.seteuid(uid)
def main():
#change to data directory if needed
os.chdir("/root/data")
#redirect outputs to a logfile
sys.stdout = sys.stderr = Log(open(LOGFILE, 'a+'))
#ensure the that the daemon runs a normal user
os.setegid(103) #set group first "pydaemon"
os.seteuid(103) #set user "pydaemon"
#start the user program here:
USERPROG()
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
else:
setuid = os.setuid
setgid = os.setgid
if gid is not None:
setgid(gid)
if uid is not None:
initgroups(uid, gid)
setuid(uid)
def requestAvatarId(self, credentials):
if pwd:
try:
cryptedPass = pwd.getpwnam(credentials.username)[1]
except KeyError:
return defer.fail(UnauthorizedLogin())
else:
if cryptedPass not in ['*', 'x'] and \
verifyCryptedPassword(cryptedPass, credentials.password):
return defer.succeed(credentials.username)
if shadow:
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
shadowPass = shadow.getspnam(credentials.username)[1]
except KeyError:
os.setegid(gid)
os.seteuid(uid)
return defer.fail(UnauthorizedLogin())
os.setegid(gid)
os.seteuid(uid)
if verifyCryptedPassword(shadowPass, credentials.password):
return defer.succeed(credentials.username)
return defer.fail(UnauthorizedLogin())
return defer.fail(UnauthorizedLogin())
def checkKey(self, credentials):
sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
if sshDir.startswith('~'): # didn't expand
return 0
uid, gid = os.geteuid(), os.getegid()
ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
os.setegid(0)
os.seteuid(0)
os.setegid(ogid)
os.seteuid(ouid)
for name in ['authorized_keys2', 'authorized_keys']:
if not os.path.exists(sshDir+name):
continue
lines = open(sshDir+name).xreadlines()
os.setegid(0)
os.seteuid(0)
os.setegid(gid)
os.seteuid(uid)
for l in lines:
l2 = l.split()
if len(l2) < 2:
continue
try:
if base64.decodestring(l2[1]) == credentials.blob:
return 1
except binascii.Error:
continue
return 0
def _runAsUser(self, f, *args, **kw):
euid = os.geteuid()
egid = os.getegid()
groups = os.getgroups()
uid, gid = self.getUserGroupId()
os.setegid(0)
os.seteuid(0)
os.setgroups(self.getOtherGroups())
os.setegid(gid)
os.seteuid(uid)
try:
f = iter(f)
except TypeError:
f = [(f, args, kw)]
try:
for i in f:
func = i[0]
args = len(i)>1 and i[1] or ()
kw = len(i)>2 and i[2] or {}
r = func(*args, **kw)
finally:
os.setegid(0)
os.seteuid(0)
os.setgroups(groups)
os.setegid(egid)
os.seteuid(euid)
return r
def getPtyOwnership(self):
ttyGid = os.stat(self.ptyTuple[2])[5]
uid, gid = self.avatar.getUserGroupId()
euid, egid = os.geteuid(), os.getegid()
os.setegid(0)
os.seteuid(0)
try:
os.chown(self.ptyTuple[2], uid, ttyGid)
finally:
os.setegid(egid)
os.seteuid(euid)
def pull(directory):
"""
Pulls latest changes with the user rights that owns the folder
"""
try:
st = os.stat(directory)
logger.info("Pulling as {0}:{1}...".format(st.st_uid, st.st_gid))
# order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
os.setegid(st.st_uid)
os.seteuid(st.st_gid)
repo = git.Repo(directory)
info = repo.remotes.origin.pull()[0]
if info.flags & info.ERROR:
logger.error("Pull failed: {0}".format(info.note))
return False
elif info.flags & info.REJECTED:
logger.error("Could not merge after pull: {0}".format(info.note))
return False
elif info.flags & info.HEAD_UPTODATE:
logger.info("Head is already up to date")
except PermissionError:
logger.error("Insufficient permissions to set uid/gid")
return False
finally:
logger.info("Restoring root permissions")
os.setegid(0)
os.seteuid(0)
return True
def run(project, command, directory, slack_webhook_url):
"""
Run the specified command as the user that owns the directory
"""
try:
st = os.stat(directory)
# order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
os.setegid(st.st_uid)
os.seteuid(st.st_gid)
logger.info("Changing working directory to '{0}'".format(directory))
logger.info("Spawning background command '{0}' as {1}:{2} for '{3}'...".format(command, st.st_uid, st.st_gid, project))
def background():
"""
I don't care how long it takes to run the command, but Bitbucket gets angry when it takes longer
than 10 seconds. My npm build takes around 15 secs, so I'd get 3 Webhooks from Bitbucket, because
it thinks each Webhook timedout.
Easy way out is to return response immediately and start a background thread that
does all of the heavy lifting.
"""
start_time = time.time()
output = subprocess.check_output(command, shell=True, cwd=directory, stderr=subprocess.STDOUT)
completed_in = time.time() - start_time
logger.info("'{0}' background command finished in {1:.2f} seconds".format(project, completed_in))
if slack_webhook_url:
slack_notification(slack_webhook_url, "Deployed `{0}` in {1:.2f} seconds! :rocket:".format(project, completed_in), output)
Thread(target=background).start()
except PermissionError:
logger.error("Insufficient permissions to set uid/gid")
except subprocess.CalledProcessError as e:
logger.error("Error: {0}".format(e.output))
finally:
logger.info("Restoring root permissions")
os.setegid(0)
os.seteuid(0)
def test_setegid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def setUp(self):
safe_rmpath(TESTFN)
TestProcess.setUp(self)
os.setegid(1000)
os.seteuid(1000)
def tearDown(self):
os.setegid(self.PROCESS_UID)
os.seteuid(self.PROCESS_GID)
TestProcess.tearDown(self)
def test_setegid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def test_setegid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
else:
setuid = os.setuid
setgid = os.setgid
if gid is not None:
setgid(gid)
if uid is not None:
initgroups(uid, gid)
setuid(uid)
def requestAvatarId(self, credentials):
if pwd:
try:
cryptedPass = pwd.getpwnam(credentials.username)[1]
except KeyError:
return defer.fail(UnauthorizedLogin())
else:
if cryptedPass not in ['*', 'x'] and \
verifyCryptedPassword(cryptedPass, credentials.password):
return defer.succeed(credentials.username)
if shadow:
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
shadowPass = shadow.getspnam(credentials.username)[1]
except KeyError:
os.setegid(gid)
os.seteuid(uid)
return defer.fail(UnauthorizedLogin())
os.setegid(gid)
os.seteuid(uid)
if verifyCryptedPassword(shadowPass, credentials.password):
return defer.succeed(credentials.username)
return defer.fail(UnauthorizedLogin())
return defer.fail(UnauthorizedLogin())
def checkKey(self, credentials):
sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
if sshDir.startswith('~'): # didn't expand
return 0
uid, gid = os.geteuid(), os.getegid()
ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
os.setegid(0)
os.seteuid(0)
os.setegid(ogid)
os.seteuid(ouid)
for name in ['authorized_keys2', 'authorized_keys']:
if not os.path.exists(sshDir+name):
continue
lines = open(sshDir+name).xreadlines()
os.setegid(0)
os.seteuid(0)
os.setegid(gid)
os.seteuid(uid)
for l in lines:
l2 = l.split()
if len(l2) < 2:
continue
try:
if base64.decodestring(l2[1]) == credentials.blob:
return 1
except binascii.Error:
continue
return 0
def _runAsUser(self, f, *args, **kw):
euid = os.geteuid()
egid = os.getegid()
groups = os.getgroups()
uid, gid = self.getUserGroupId()
os.setegid(0)
os.seteuid(0)
os.setgroups(self.getOtherGroups())
os.setegid(gid)
os.seteuid(uid)
try:
f = iter(f)
except TypeError:
f = [(f, args, kw)]
try:
for i in f:
func = i[0]
args = len(i)>1 and i[1] or ()
kw = len(i)>2 and i[2] or {}
r = func(*args, **kw)
finally:
os.setegid(0)
os.seteuid(0)
os.setgroups(groups)
os.setegid(egid)
os.seteuid(euid)
return r
def getPtyOwnership(self):
ttyGid = os.stat(self.ptyTuple[2])[5]
uid, gid = self.avatar.getUserGroupId()
euid, egid = os.geteuid(), os.getegid()
os.setegid(0)
os.seteuid(0)
try:
os.chown(self.ptyTuple[2], uid, ttyGid)
finally:
os.setegid(egid)
os.seteuid(euid)
def test_setegid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def test_setegid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def test_setegid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(OSError, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def test_setegid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
def impersonate_user(self, username, password):
"""Change process effective user/group ids to reflect
logged in user.
"""
try:
pwdstruct = pwd.getpwnam(username)
except KeyError:
raise AuthorizerError(self.msg_no_such_user)
else:
os.setegid(pwdstruct.pw_gid)
os.seteuid(pwdstruct.pw_uid)
def terminate_impersonation(self, username):
"""Revert process effective user/group IDs."""
os.setegid(PROCESS_GID)
os.seteuid(PROCESS_UID)
def _change_process_user_group(self):
# type: () -> None
if self.user:
self._log("changing process user to {}".format(self.user))
os.seteuid(self.user)
if self.group:
self._log("changing process group to {}".format(self.group))
os.setegid(self.group)
def runAsEffectiveUser(euid, egid, function, *args, **kwargs):
"""
Run the given function wrapped with seteuid/setegid calls.
This will try to minimize the number of seteuid/setegid calls, comparing
current and wanted permissions
@param euid: effective UID used to call the function.
@type euid: C{int}
@type egid: effective GID used to call the function.
@param egid: C{int}
@param function: the function run with the specific permission.
@type function: any callable
@param *args: arguments passed to C{function}
@param **kwargs: keyword arguments passed to C{function}
"""
uid, gid = os.geteuid(), os.getegid()
if uid == euid and gid == egid:
return function(*args, **kwargs)
else:
if uid != 0 and (uid != euid or gid != egid):
os.seteuid(0)
if gid != egid:
os.setegid(egid)
if euid != 0 and (euid != uid or gid != egid):
os.seteuid(euid)
try:
return function(*args, **kwargs)
finally:
if euid != 0 and (uid != euid or gid != egid):
os.seteuid(0)
if gid != egid:
os.setegid(gid)
if uid != 0 and (uid != euid or gid != egid):
os.seteuid(uid)
def _runAsUser(self, f, *args, **kw):
euid = os.geteuid()
egid = os.getegid()
groups = os.getgroups()
uid, gid = self.getUserGroupId()
os.setegid(0)
os.seteuid(0)
os.setgroups(self.getOtherGroups())
os.setegid(gid)
os.seteuid(uid)
try:
f = iter(f)
except TypeError:
f = [(f, args, kw)]
try:
for i in f:
func = i[0]
args = len(i) > 1 and i[1] or ()
kw = len(i) > 2 and i[2] or {}
r = func(*args, **kw)
finally:
os.setegid(0)
os.seteuid(0)
os.setgroups(groups)
os.setegid(egid)
os.seteuid(euid)
return r
def getPtyOwnership(self):
ttyGid = os.stat(self.ptyTuple[2])[5]
uid, gid = self.avatar.getUserGroupId()
euid, egid = os.geteuid(), os.getegid()
os.setegid(0)
os.seteuid(0)
try:
os.chown(self.ptyTuple[2], uid, ttyGid)
finally:
os.setegid(egid)
os.seteuid(euid)