def _try_run_as_postgres(self):
if platform.LINUX and os.getegid() == 0:
try:
uid = pwd.getpwnam('postgres').pw_uid
os.seteuid(uid)
return True
except Exception as e:
logging.error('Failed run as postgres: {0}'.format(e))
pass
return False
python类seteuid()的实例源码
def impersonate_user(self, username, password):
"""Change process effective user/group ids to reflect
logged in user.
"""
try:
pwdstruct = pwd.getpwnam(username)
except KeyError:
raise AuthorizerError(self.msg_no_such_user)
else:
os.setegid(pwdstruct.pw_gid)
os.seteuid(pwdstruct.pw_uid)
def terminate_impersonation(self, username):
"""Revert process effective user/group IDs."""
os.setegid(PROCESS_GID)
os.seteuid(PROCESS_UID)
def _change_process_user_group(self):
# type: () -> None
if self.user:
self._log("changing process user to {}".format(self.user))
os.seteuid(self.user)
if self.group:
self._log("changing process group to {}".format(self.group))
os.setegid(self.group)
def runAsEffectiveUser(euid, egid, function, *args, **kwargs):
"""
Run the given function wrapped with seteuid/setegid calls.
This will try to minimize the number of seteuid/setegid calls, comparing
current and wanted permissions
@param euid: effective UID used to call the function.
@type euid: C{int}
@type egid: effective GID used to call the function.
@param egid: C{int}
@param function: the function run with the specific permission.
@type function: any callable
@param *args: arguments passed to C{function}
@param **kwargs: keyword arguments passed to C{function}
"""
uid, gid = os.geteuid(), os.getegid()
if uid == euid and gid == egid:
return function(*args, **kwargs)
else:
if uid != 0 and (uid != euid or gid != egid):
os.seteuid(0)
if gid != egid:
os.setegid(egid)
if euid != 0 and (euid != uid or gid != egid):
os.seteuid(euid)
try:
return function(*args, **kwargs)
finally:
if euid != 0 and (uid != euid or gid != egid):
os.seteuid(0)
if gid != egid:
os.setegid(gid)
if uid != 0 and (uid != euid or gid != egid):
os.seteuid(uid)
def setUp(self):
self.factory = OpenSSHFactory()
self.keysDir = FilePath(self.mktemp())
self.keysDir.makedirs()
self.factory.dataRoot = self.keysDir.path
self.moduliDir = FilePath(self.mktemp())
self.moduliDir.makedirs()
self.factory.moduliRoot = self.moduliDir.path
self.keysDir.child("ssh_host_foo").setContent(b"foo")
self.keysDir.child("bar_key").setContent(b"foo")
self.keysDir.child("ssh_host_one_key").setContent(
keydata.privateRSA_openssh)
self.keysDir.child("ssh_host_two_key").setContent(
keydata.privateDSA_openssh)
self.keysDir.child("ssh_host_three_key").setContent(
b"not a key content")
self.keysDir.child("ssh_host_one_key.pub").setContent(
keydata.publicRSA_openssh)
self.moduliDir.child("moduli").setContent(b"""
# $OpenBSD: moduli,v 1.xx 2016/07/26 12:34:56 jhacker Exp $
# Time Type Tests Tries Size Generator Modulus
20030501000000 2 6 100 2047 2 FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF
19981111000000 2 6 100 1023 2 FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF
""")
self.mockos = MockOS()
self.patch(os, "seteuid", self.mockos.seteuid)
self.patch(os, "setegid", self.mockos.setegid)
def _runAsUser(self, f, *args, **kw):
euid = os.geteuid()
egid = os.getegid()
groups = os.getgroups()
uid, gid = self.getUserGroupId()
os.setegid(0)
os.seteuid(0)
os.setgroups(self.getOtherGroups())
os.setegid(gid)
os.seteuid(uid)
try:
f = iter(f)
except TypeError:
f = [(f, args, kw)]
try:
for i in f:
func = i[0]
args = len(i) > 1 and i[1] or ()
kw = len(i) > 2 and i[2] or {}
r = func(*args, **kw)
finally:
os.setegid(0)
os.seteuid(0)
os.setgroups(groups)
os.setegid(egid)
os.seteuid(euid)
return r
def seteuid(self, egid):
"""
Mock C{os.seteuid}, store result.
"""
self.seteuidCalls.append(egid)
def setUp(self):
safe_rmpath(TESTFN)
TestProcess.setUp(self)
os.setegid(1000)
os.seteuid(1000)
def tearDown(self):
os.setegid(self.PROCESS_UID)
os.seteuid(self.PROCESS_GID)
TestProcess.tearDown(self)
def tor_new_process():
"""
Drops privileges to TOR_USER user and start a new Tor process
"""
debian_tor_uid = getpwnam(TOR_USER).pw_uid
debian_tor_gid = getpwnam(TOR_USER).pw_gid
os.setgid(debian_tor_gid)
os.setuid(debian_tor_uid)
os.setegid(debian_tor_gid)
os.seteuid(debian_tor_uid)
os.environ['HOME'] = "/var/lib/tor"
tor_process = stem.process.launch_tor_with_config(
config = {
'SocksPort': '6666',
'ControlPort': '6969',
'DNSPort': '9053',
'DNSListenAddress': '127.0.0.1',
'AutomapHostsOnResolve': '1',
'AutomapHostsSuffixes': '.exit,.onion',
'VirtualAddrNetwork': '10.192.0.0/10',
'TransPort': '9040',
'TransListenAddress': '127.0.0.1',
'AvoidDiskWrites': '1',
'WarnUnsafeSocks': '1',
})
def setUp(self):
safe_rmpath(TESTFN)
TestProcess.setUp(self)
os.setegid(1000)
os.seteuid(1000)
def tearDown(self):
os.setegid(self.PROCESS_UID)
os.seteuid(self.PROCESS_GID)
TestProcess.tearDown(self)
def test_seteuid(self):
if os.getuid() != 0:
self.assertRaises(OSError, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
def become_persona(self):
if self.persona is not (None, None):
uid, gid = self.persona
# the order of these is important!
os.setegid(gid)
os.seteuid(uid)
def become_nobody(self):
if self.persona is not (None, None):
os.seteuid(self.PROCESS_UID)
os.setegid(self.PROCESS_GID)
# cwd, cdup, open, listdir
def change_users_and_groups(mamaji_data):
current_users = mamaji_data['current_users']
current_groups = mamaji_data['current_groups']
pending_users = mamaji_data['pending_users']
pending_groups = mamaji_data['pending_groups']
groups = mamaji_data['supplementary_groups']
if groups:
os.setgroups(groups)
group_types = [k for k in ['rgid', 'egid', 'sgid']
if pending_groups[k] is not None]
group_types_len = len(group_types)
if group_types_len == 3:
setresgid(pending_groups['rgid'], pending_groups['egid'],
pending_groups['sgid'])
elif group_types_len == 2:
if 'rgid' in group_types and 'egid' in group_types:
os.setregid(pending_groups['rgid'], pending_groups['egid'])
elif group_types_len == 1:
if 'egid' in group_types:
os.setegid(pending_groups['egid'])
user_types = [k for k in ['ruid', 'euid', 'suid']
if pending_users[k] is not None]
user_types_len = len(user_types)
if user_types_len == 3:
setresuid(pending_users['ruid'], pending_users['euid'],
pending_users['suid'])
elif user_types_len == 2:
if 'ruid' in user_types and 'euid' in user_types:
os.setreuid(pending_users['ruid'], pending_users['euid'])
elif user_types_len == 1:
if 'euid' in user_types:
os.seteuid(pending_users['euid'])
if pending_groups['gid'] is not None:
os.setgid(pending_groups['gid'])
if pending_users['uid'] is not None:
os.setuid(pending_users['uid'])
def switchUID(uid, gid, euid=False):
"""
Attempts to switch the uid/euid and gid/egid for the current process.
If C{uid} is the same value as L{os.getuid} (or L{os.geteuid}),
this function will issue a L{UserWarning} and not raise an exception.
@type uid: C{int} or L{None}
@param uid: the UID (or EUID) to switch the current process to. This
parameter will be ignored if the value is L{None}.
@type gid: C{int} or L{None}
@param gid: the GID (or EGID) to switch the current process to. This
parameter will be ignored if the value is L{None}.
@type euid: C{bool}
@param euid: if True, set only effective user-id rather than real user-id.
(This option has no effect unless the process is running
as root, in which case it means not to shed all
privileges, retaining the option to regain privileges
in cases such as spawning processes. Use with caution.)
"""
if euid:
setuid = os.seteuid
setgid = os.setegid
getuid = os.geteuid
else:
setuid = os.setuid
setgid = os.setgid
getuid = os.getuid
if gid is not None:
setgid(gid)
if uid is not None:
if uid == getuid():
uidText = (euid and "euid" or "uid")
actionText = "tried to drop privileges and set%s %s" % (uidText, uid)
problemText = "%s is already %s" % (uidText, getuid())
warnings.warn("%s but %s; should we be root? Continuing."
% (actionText, problemText))
else:
initgroups(uid, gid)
setuid(uid)