def callIntoPAM(service, user, conv):
"""A testing hook.
"""
pam = PAM.pam()
pam.start(service)
pam.set_item(PAM.PAM_USER, user)
pam.set_item(PAM.PAM_CONV, conv)
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
pam.authenticate() # these will raise
pam.acct_mgmt()
return 1
finally:
os.setegid(gid)
os.seteuid(uid)
python类seteuid()的实例源码
def create_user(no_utf8=False):
curr_user = os.geteuid();
target = pwd.getpwnam('postgres')
try:
os.seteuid(target.pw_uid)
m = hashlib.md5()
m.update(DB_PASSWORD)
m.update(DB_USER)
command = """CREATE USER %s PASSWORD 'md5%s'""" % (DB_USER, m.hexdigest())
subprocess.check_call(['psql', '-c', command])
if no_utf8:
command = """CREATE DATABASE %s OWNER %s""" % (DB_NAME, DB_USER)
else:
command = """CREATE DATABASE %s OWNER %s ENCODING 'UTF8'""" % (DB_NAME, DB_USER)
subprocess.check_call(['psql', '-c', command])
os.seteuid(curr_user)
except Exception, e:
raise SystemExit(str(e))
return target
def test_interrupted_systemcall(self):
'''
Make sure interrupted system calls don't break the world, since we
can't control what all signals our connection thread will get
'''
if 'linux' not in platform:
raise SkipTest('Unable to reproduce error case on'
' non-linux platforms')
path = 'interrupt_test'
value = b"1"
self.client.create(path, value)
# set the euid to the current process' euid.
# glibc sends SIGRT to all children, which will interrupt the
# system call
os.seteuid(os.geteuid())
# basic sanity test that it worked alright
assert self.client.get(path)[0] == value
def callIntoPAM(service, user, conv):
"""A testing hook.
"""
pam = PAM.pam()
pam.start(service)
pam.set_item(PAM.PAM_USER, user)
pam.set_item(PAM.PAM_CONV, conv)
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
pam.authenticate() # these will raise
pam.acct_mgmt()
return 1
finally:
os.setegid(gid)
os.seteuid(uid)
def test_getPrivateKeysAsRoot(self):
"""
L{OpenSSHFactory.getPrivateKeys} should switch to root if the keys
aren't readable by the current user.
"""
keyFile = self.keysDir.child("ssh_host_two_key")
# Fake permission error by changing the mode
keyFile.chmod(0000)
self.addCleanup(keyFile.chmod, 0o777)
# And restore the right mode when seteuid is called
savedSeteuid = os.seteuid
def seteuid(euid):
keyFile.chmod(0o777)
return savedSeteuid(euid)
self.patch(os, "seteuid", seteuid)
keys = self.factory.getPrivateKeys()
self.assertEqual(len(keys), 2)
keyTypes = keys.keys()
self.assertEqual(set(keyTypes), set([b'ssh-rsa', b'ssh-dss']))
self.assertEqual(self.mockos.seteuidCalls, [0, os.geteuid()])
self.assertEqual(self.mockos.setegidCalls, [0, os.getegid()])
def test_interrupted_systemcall(self):
'''
Make sure interrupted system calls don't break the world, since we
can't control what all signals our connection thread will get
'''
if 'linux' not in platform:
raise SkipTest('Unable to reproduce error case on'
' non-linux platforms')
path = 'interrupt_test'
value = b"1"
self.client.create(path, value)
# set the euid to the current process' euid.
# glibc sends SIGRT to all children, which will interrupt the
# system call
os.seteuid(os.geteuid())
# basic sanity test that it worked alright
assert self.client.get(path)[0] == value
def main():
#change to data directory if needed
os.chdir("/root/data")
#redirect outputs to a logfile
sys.stdout = sys.stderr = Log(open(LOGFILE, 'a+'))
#ensure the that the daemon runs a normal user
os.setegid(103) #set group first "pydaemon"
os.seteuid(103) #set user "pydaemon"
#start the user program here:
USERPROG()
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
else:
setuid = os.setuid
setgid = os.setgid
if gid is not None:
setgid(gid)
if uid is not None:
initgroups(uid, gid)
setuid(uid)
def requestAvatarId(self, credentials):
if pwd:
try:
cryptedPass = pwd.getpwnam(credentials.username)[1]
except KeyError:
return defer.fail(UnauthorizedLogin())
else:
if cryptedPass not in ['*', 'x'] and \
verifyCryptedPassword(cryptedPass, credentials.password):
return defer.succeed(credentials.username)
if shadow:
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
shadowPass = shadow.getspnam(credentials.username)[1]
except KeyError:
os.setegid(gid)
os.seteuid(uid)
return defer.fail(UnauthorizedLogin())
os.setegid(gid)
os.seteuid(uid)
if verifyCryptedPassword(shadowPass, credentials.password):
return defer.succeed(credentials.username)
return defer.fail(UnauthorizedLogin())
return defer.fail(UnauthorizedLogin())
def checkKey(self, credentials):
sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
if sshDir.startswith('~'): # didn't expand
return 0
uid, gid = os.geteuid(), os.getegid()
ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
os.setegid(0)
os.seteuid(0)
os.setegid(ogid)
os.seteuid(ouid)
for name in ['authorized_keys2', 'authorized_keys']:
if not os.path.exists(sshDir+name):
continue
lines = open(sshDir+name).xreadlines()
os.setegid(0)
os.seteuid(0)
os.setegid(gid)
os.seteuid(uid)
for l in lines:
l2 = l.split()
if len(l2) < 2:
continue
try:
if base64.decodestring(l2[1]) == credentials.blob:
return 1
except binascii.Error:
continue
return 0
def _runAsUser(self, f, *args, **kw):
euid = os.geteuid()
egid = os.getegid()
groups = os.getgroups()
uid, gid = self.getUserGroupId()
os.setegid(0)
os.seteuid(0)
os.setgroups(self.getOtherGroups())
os.setegid(gid)
os.seteuid(uid)
try:
f = iter(f)
except TypeError:
f = [(f, args, kw)]
try:
for i in f:
func = i[0]
args = len(i)>1 and i[1] or ()
kw = len(i)>2 and i[2] or {}
r = func(*args, **kw)
finally:
os.setegid(0)
os.seteuid(0)
os.setgroups(groups)
os.setegid(egid)
os.seteuid(euid)
return r
def getPtyOwnership(self):
ttyGid = os.stat(self.ptyTuple[2])[5]
uid, gid = self.avatar.getUserGroupId()
euid, egid = os.geteuid(), os.getegid()
os.setegid(0)
os.seteuid(0)
try:
os.chown(self.ptyTuple[2], uid, ttyGid)
finally:
os.setegid(egid)
os.seteuid(euid)
def pull(directory):
"""
Pulls latest changes with the user rights that owns the folder
"""
try:
st = os.stat(directory)
logger.info("Pulling as {0}:{1}...".format(st.st_uid, st.st_gid))
# order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
os.setegid(st.st_uid)
os.seteuid(st.st_gid)
repo = git.Repo(directory)
info = repo.remotes.origin.pull()[0]
if info.flags & info.ERROR:
logger.error("Pull failed: {0}".format(info.note))
return False
elif info.flags & info.REJECTED:
logger.error("Could not merge after pull: {0}".format(info.note))
return False
elif info.flags & info.HEAD_UPTODATE:
logger.info("Head is already up to date")
except PermissionError:
logger.error("Insufficient permissions to set uid/gid")
return False
finally:
logger.info("Restoring root permissions")
os.setegid(0)
os.seteuid(0)
return True
def run(project, command, directory, slack_webhook_url):
"""
Run the specified command as the user that owns the directory
"""
try:
st = os.stat(directory)
# order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
os.setegid(st.st_uid)
os.seteuid(st.st_gid)
logger.info("Changing working directory to '{0}'".format(directory))
logger.info("Spawning background command '{0}' as {1}:{2} for '{3}'...".format(command, st.st_uid, st.st_gid, project))
def background():
"""
I don't care how long it takes to run the command, but Bitbucket gets angry when it takes longer
than 10 seconds. My npm build takes around 15 secs, so I'd get 3 Webhooks from Bitbucket, because
it thinks each Webhook timedout.
Easy way out is to return response immediately and start a background thread that
does all of the heavy lifting.
"""
start_time = time.time()
output = subprocess.check_output(command, shell=True, cwd=directory, stderr=subprocess.STDOUT)
completed_in = time.time() - start_time
logger.info("'{0}' background command finished in {1:.2f} seconds".format(project, completed_in))
if slack_webhook_url:
slack_notification(slack_webhook_url, "Deployed `{0}` in {1:.2f} seconds! :rocket:".format(project, completed_in), output)
Thread(target=background).start()
except PermissionError:
logger.error("Insufficient permissions to set uid/gid")
except subprocess.CalledProcessError as e:
logger.error("Error: {0}".format(e.output))
finally:
logger.info("Restoring root permissions")
os.setegid(0)
os.seteuid(0)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def setUp(self):
safe_rmpath(TESTFN)
TestProcess.setUp(self)
os.setegid(1000)
os.seteuid(1000)
def tearDown(self):
os.setegid(self.PROCESS_UID)
os.seteuid(self.PROCESS_GID)
TestProcess.tearDown(self)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
else:
setuid = os.setuid
setgid = os.setgid
if gid is not None:
setgid(gid)
if uid is not None:
initgroups(uid, gid)
setuid(uid)
def requestAvatarId(self, credentials):
if pwd:
try:
cryptedPass = pwd.getpwnam(credentials.username)[1]
except KeyError:
return defer.fail(UnauthorizedLogin())
else:
if cryptedPass not in ['*', 'x'] and \
verifyCryptedPassword(cryptedPass, credentials.password):
return defer.succeed(credentials.username)
if shadow:
gid = os.getegid()
uid = os.geteuid()
os.setegid(0)
os.seteuid(0)
try:
shadowPass = shadow.getspnam(credentials.username)[1]
except KeyError:
os.setegid(gid)
os.seteuid(uid)
return defer.fail(UnauthorizedLogin())
os.setegid(gid)
os.seteuid(uid)
if verifyCryptedPassword(shadowPass, credentials.password):
return defer.succeed(credentials.username)
return defer.fail(UnauthorizedLogin())
return defer.fail(UnauthorizedLogin())
def checkKey(self, credentials):
sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
if sshDir.startswith('~'): # didn't expand
return 0
uid, gid = os.geteuid(), os.getegid()
ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
os.setegid(0)
os.seteuid(0)
os.setegid(ogid)
os.seteuid(ouid)
for name in ['authorized_keys2', 'authorized_keys']:
if not os.path.exists(sshDir+name):
continue
lines = open(sshDir+name).xreadlines()
os.setegid(0)
os.seteuid(0)
os.setegid(gid)
os.seteuid(uid)
for l in lines:
l2 = l.split()
if len(l2) < 2:
continue
try:
if base64.decodestring(l2[1]) == credentials.blob:
return 1
except binascii.Error:
continue
return 0
def _runAsUser(self, f, *args, **kw):
euid = os.geteuid()
egid = os.getegid()
groups = os.getgroups()
uid, gid = self.getUserGroupId()
os.setegid(0)
os.seteuid(0)
os.setgroups(self.getOtherGroups())
os.setegid(gid)
os.seteuid(uid)
try:
f = iter(f)
except TypeError:
f = [(f, args, kw)]
try:
for i in f:
func = i[0]
args = len(i)>1 and i[1] or ()
kw = len(i)>2 and i[2] or {}
r = func(*args, **kw)
finally:
os.setegid(0)
os.seteuid(0)
os.setgroups(groups)
os.setegid(egid)
os.seteuid(euid)
return r
def getPtyOwnership(self):
ttyGid = os.stat(self.ptyTuple[2])[5]
uid, gid = self.avatar.getUserGroupId()
euid, egid = os.geteuid(), os.getegid()
os.setegid(0)
os.seteuid(0)
try:
os.chown(self.ptyTuple[2], uid, ttyGid)
finally:
os.setegid(egid)
os.seteuid(euid)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(OSError, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def _try_run_as_postgres(self):
if platform.LINUX and os.getegid() == 0:
try:
uid = pwd.getpwnam('postgres').pw_uid
os.seteuid(uid)
return True
except Exception as e:
logging.error('Failed run as postgres: {0}'.format(e))
pass
return False
def _try_run_as_postgres(self):
if platform.UNIX and os.getegid() == 0:
try:
import pwd
uid = pwd.getpwnam('postgres').pw_uid
os.seteuid(uid)
return True
except Exception as e:
sys.stderr.write("Failed run as postgres: {0}\n".format(e))
pass
return False