def set_user(username):
if username is None:
return
import pwd
import grp
try:
pwrec = pwd.getpwnam(username)
except KeyError:
logging.error('user not found: %s' % username)
raise
user = pwrec[0]
uid = pwrec[2]
gid = pwrec[3]
cur_uid = os.getuid()
if uid == cur_uid:
return
if cur_uid != 0:
logging.error('can not set user as nonroot user')
# will raise later
# inspired by supervisor
if hasattr(os, 'setgroups'):
groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
groups.insert(0, gid)
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)
python类setgid()的实例源码
def daemonize(keepfd=None, chdir='/'):
os.umask(0)
if chdir:
os.chdir(chdir)
else:
os.chdir('/')
os.setgid(os.getgid()) # relinquish elevations
os.setuid(os.getuid()) # relinquish elevations
# Double fork to daemonize
if os.fork() > 0: os._exit(0) # Parent exits
os.setsid() # Obtain new process group
if os.fork() > 0: os._exit(0) # Parent exits
# Signal handling
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Close open files
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY: maxfd = 256
for fd in reversed(range(maxfd)):
try:
if fd != keepfd:
os.close(fd)
except OSError:
_, exc, _ = sys.exc_info()
if exc.errno != errno.EBADF: raise
# Redirect I/O to /dev/null
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
def preexec_fn(user_uid, user_gid, user_home):
def result():
os.setgid(user_gid)
os.setuid(user_uid)
os.environ["HOME"] = user_home
return result
def __init__(self, request, client_address, server):
# Change user and group (only when runned as root)
if (os.getgid() == 0) and pysshrp.common.config.userId:
os.setgid(pysshrp.common.config.userId)
if (os.getuid() == 0) and pysshrp.common.config.groupId:
os.setuid(pysshrp.common.config.groupId)
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def set_owner_process(uid, gid):
""" set user and group of workers processes """
if gid:
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
os.setgid(gid)
if uid:
os.setuid(uid)
def demote(self):
# demote root user to any specified user or group
try:
if os.getuid() == 0:
# drop supplementary groups
os.setgroups([])
if self.group:
try:
os.setgid(self.the_grp.gr_gid)
except Exception, ex:
logging.critical("failed to set group to \"%s\" [%s]" % (self.group, str(ex)))
sys.exit(1)
if self.user:
try:
the_pwd = pwd.getpwnam(self.user)
os.setuid(self.the_pwd.pw_uid)
except Exception, ex:
logging.critical("failed to set user to \"%s\" [%s]" % (self.user, str(ex)))
sys.exit(1)
else:
if self.user or self.group:
logging.critical('not privileged ~~ cannot change to user [%s] / group [%s]' % (self.user, self.group))
sys.exit(1)
except Exception, ex:
logging.critical("daemon.demote() caught exception [%s]" % str(ex))
sys.exit(1)
def set_user(username):
if username is None:
return
import pwd
import grp
try:
pwrec = pwd.getpwnam(username)
except KeyError:
logging.error('user not found: %s' % username)
raise
user = pwrec[0]
uid = pwrec[2]
gid = pwrec[3]
cur_uid = os.getuid()
if uid == cur_uid:
return
if cur_uid != 0:
logging.error('can not set user as nonroot user')
# will raise later
# inspired by supervisor
if hasattr(os, 'setgroups'):
groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
groups.insert(0, gid)
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)
def set_user(username):
if username is None:
return
import pwd
import grp
try:
pwrec = pwd.getpwnam(username)
except KeyError:
logging.error('user not found: %s' % username)
raise
user = pwrec[0]
uid = pwrec[2]
gid = pwrec[3]
cur_uid = os.getuid()
if uid == cur_uid:
return
if cur_uid != 0:
logging.error('can not set user as nonroot user')
# will raise later
# inspired by supervisor
if hasattr(os, 'setgroups'):
groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
groups.insert(0, gid)
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)
def test_setgid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setgid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32)
def run_package_reporter(self):
"""
Run the L{PackageReporter} if there were successfully completed tasks.
"""
if self.handled_tasks_count == 0:
# Nothing was done
return
if os.getuid() == 0:
os.setgid(grp.getgrnam("landscape").gr_gid)
os.setuid(pwd.getpwnam("landscape").pw_uid)
command = find_reporter_command(self._config)
if self._config.config is not None:
command += " -c %s" % self._config.config
os.system(command)
def set_owner_process(uid, gid):
""" set user and group of workers processes """
if gid:
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
os.setgid(gid)
if uid:
os.setuid(uid)
def daemonize(keepfd=None, chdir='/'):
os.umask(0)
if chdir:
os.chdir(chdir)
else:
os.chdir('/')
os.setgid(os.getgid()) # relinquish elevations
os.setuid(os.getuid()) # relinquish elevations
# Double fork to daemonize
if os.fork() > 0: os._exit(0) # Parent exits
os.setsid() # Obtain new process group
if os.fork() > 0: os._exit(0) # Parent exits
# Signal handling
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Close open files
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY: maxfd = 256
for fd in reversed(range(maxfd)):
try:
if fd != keepfd:
os.close(fd)
except OSError:
_, exc, _ = sys.exc_info()
if exc.errno != errno.EBADF: raise
# Redirect I/O to /dev/null
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
def set_owner_process(uid, gid):
""" set user and group of workers processes """
if gid:
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
os.setgid(gid)
if uid:
os.setuid(uid)
def test_setgid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setgid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32)
def test_setgid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setgid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32)
def set_owner_process(uid, gid):
""" set user and group of workers processes """
if gid:
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
os.setgid(gid)
if uid:
os.setuid(uid)
def initgroups(uid, primaryGid):
"""Initializes the group access list.
This is done by reading the group database /etc/group and using all
groups of which C{uid} is a member. The additional group
C{primaryGid} is also added to the list.
If the given user is a member of more than C{NGROUPS}, arbitrary
groups will be silently discarded to bring the number below that
limit.
"""
try:
# Try to get the maximum number of groups
max_groups = os.sysconf("SC_NGROUPS_MAX")
except:
# No predefined limit
max_groups = 0
username = pwd.getpwuid(uid)[0]
l = []
if primaryGid is not None:
l.append(primaryGid)
for groupname, password, gid, userlist in grp.getgrall():
if username in userlist:
l.append(gid)
if len(l) == max_groups:
break # No more groups, ignore any more
try:
_setgroups_until_success(l)
except OSError, e:
# We might be able to remove this code now that we
# don't try to setgid/setuid even when not asked to.
if e.errno == errno.EPERM:
for g in getgroups():
if g not in l:
raise
else:
raise
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
else:
setuid = os.setuid
setgid = os.setgid
if gid is not None:
setgid(gid)
if uid is not None:
initgroups(uid, gid)
setuid(uid)
def _set_permission(self):
pw = getpwnam(self.username)
uid = pw.pw_uid
gid = pw.pw_gid
os.setgroups([gid])
os.setgid(gid)
os.setuid(uid)
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner