def test_setregid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
python类setregid()的实例源码
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def test_setregid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def test_setregid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def restorePrivs(self):
# back to root first
self._elevatePrivs()
# then set saved
privs = self.privStack.pop()
os.environ.clear()
os.environ.update(self.privEnviron.pop())
os.setregid(privs['rgid'], privs['egid'])
setresuid(privs['ruid'], privs['euid'])
def dropPrivsForever(self):
self._elevatePrivs()
os.setregid(self.unprivGid, self.unprivGid)
os.setreuid(self.unprivUid, self.unprivUid)
def _elevatePrivs(self):
setresuid(0, 0, 0)
os.setregid(0, 0)
def become_user_without_push(self, uid, gid=None):
self._elevatePrivs()
if gid is not None:
os.setregid(gid, gid)
setresuid(uid, uid, 0)
def condDropPrivs(uid, gid):
if gid is not None:
os.setregid(gid, gid)
if uid is not None:
os.setreuid(uid, uid)
def test_setregid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def test_setregid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def deescalate_sudo():
uid = os.environ.get('SUDO_UID')
gid = os.environ.get('SUDO_GID')
if uid and gid:
uid = int(uid)
gid = int(gid)
# username = pwd.getpwuid(uid).pw_name
# groups = [g.gr_gid for g in grp.getgrall() if username in g.gr_mem]
os.setgroups([]) # for now loose supplementary groups
os.setregid(int(gid), int(gid))
os.setreuid(int(uid), int(uid))
def test_setregid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(OSError, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def test_setregid(self):
if os.getuid() != 0:
self.assertRaises(os.error, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def setregid(self, val1, val2):
"""
Override C{os.setregid}. Do nothing.
"""
self.actions.append(('setregid', val1, val2))
def test_setregid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
self.assertRaises(OSError, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
def test_setregid_neg1(self):
# Needs to accept -1. We run this in a subprocess to avoid
# altering the test runner's process state (issue8045).
subprocess.check_call([
sys.executable, '-c',
'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
def change_users_and_groups(mamaji_data):
current_users = mamaji_data['current_users']
current_groups = mamaji_data['current_groups']
pending_users = mamaji_data['pending_users']
pending_groups = mamaji_data['pending_groups']
groups = mamaji_data['supplementary_groups']
if groups:
os.setgroups(groups)
group_types = [k for k in ['rgid', 'egid', 'sgid']
if pending_groups[k] is not None]
group_types_len = len(group_types)
if group_types_len == 3:
setresgid(pending_groups['rgid'], pending_groups['egid'],
pending_groups['sgid'])
elif group_types_len == 2:
if 'rgid' in group_types and 'egid' in group_types:
os.setregid(pending_groups['rgid'], pending_groups['egid'])
elif group_types_len == 1:
if 'egid' in group_types:
os.setegid(pending_groups['egid'])
user_types = [k for k in ['ruid', 'euid', 'suid']
if pending_users[k] is not None]
user_types_len = len(user_types)
if user_types_len == 3:
setresuid(pending_users['ruid'], pending_users['euid'],
pending_users['suid'])
elif user_types_len == 2:
if 'ruid' in user_types and 'euid' in user_types:
os.setreuid(pending_users['ruid'], pending_users['euid'])
elif user_types_len == 1:
if 'euid' in user_types:
os.seteuid(pending_users['euid'])
if pending_groups['gid'] is not None:
os.setgid(pending_groups['gid'])
if pending_users['uid'] is not None:
os.setuid(pending_users['uid'])
def daemonize(self, user=0, group=0):
# do the UNIX double-fork magic, see Stevens' "Advanced
# Programming in the UNIX Environment" for details (ISBN 0201563177)
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as e:
sys.stderr.write("Fork failed (#1): %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
# chdir -> don't prevent unmounting...
os.chdir("/")
# Create new process group with the process as leader
os.setsid()
# Set user/group depending on params
if group:
os.setregid(getgrnam(group)[2], getgrnam(group)[2])
if user:
os.setreuid(getpwnam(user)[2], getpwnam(user)[2])
# do second fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError as e:
sys.stderr.write("Fork failed (#2): %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
sys.stdout.flush()
sys.stderr.flush()
si = os.open("/dev/null", os.O_RDONLY)
so = os.open("/dev/null", os.O_WRONLY)
os.dup2(si, 0)
os.dup2(so, 1)
os.dup2(so, 2)
os.close(si)
os.close(so)
self.logger.debug("Daemonized with PID %d.", os.getpid())