def _dropPrivileges(self, user, group):
import pwd, grp
# Get the uid/gid from the name
runningUid = pwd.getpwnam(user).pw_uid
runningGid = grp.getgrnam(group).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(runningGid)
os.setuid(runningUid)
# Reset logging
self.resetLogging()
python类setgid()的实例源码
def drop_privileges(self, uid_name, gid_name):
if os.getuid() != 0:
# We're not root so, like, whatever dude
self.logger.info("Not running as root. Cannot drop permissions.")
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(0o077)
self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def change_process_owner(uid, gid):
""" Change the owning UID and GID of this process.
:param uid: The target UID for the daemon process.
:param gid: The target GID for the daemon process.
:return: ``None``.
Set the GID then the UID of the process (in that order, to avoid
permission errors) to the specified `gid` and `uid` values.
Requires appropriate OS privileges for this process.
"""
try:
os.setgid(gid)
os.setuid(uid)
except Exception as exc:
error = DaemonOSEnvironmentError(
"Unable to change process owner ({exc})".format(exc=exc))
raise error
def sudo(user):
raise NotImplemented
"""
Run your function as the given user
Please note that this *permanently* changes user, you won't be able to change back unless you have
sudo privileges.
Best used inside @background.
"""
user = pwd.getpwnam(user)
print(user)
def decorator(func):
def func_wrapper(*args,**kwargs):
os.setuid(user.pw_uid)
os.setgid(user.pw_gid)
p = func(*args,**kwargs)
return p
return func_wrapper
return decorator
def set_owner_process(uid, gid, initgroups=False):
""" set user and group of workers processes """
if gid:
if uid:
try:
username = get_username(uid)
except KeyError:
initgroups = False
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
if initgroups:
os.initgroups(username, gid)
else:
os.setgid(gid)
if uid:
os.setuid(uid)
def _setgroup(group):
''' Normalizes group to a gid and sets the current gid, or does
nothing if group is None.
'''
if group is None:
return
# Normalize group to gid
elif isinstance(group, str):
gid = grp.getgrnam(group).gr_gid
# The group is already a gid.
else:
gid = group
try:
os.setgid(gid)
except OSError:
self.logger.error('Unable to change group.')
sys.exit(1)
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name='nobody', gid_name='nogroup'):
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(077)
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name='nobody', gid_name='nobody'):
import os, pwd, grp
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(0o77)
def set_owner_process(uid, gid, initgroups=False):
""" set user and group of workers processes """
if gid:
if uid:
try:
username = get_username(uid)
except KeyError:
initgroups = False
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
if initgroups:
os.initgroups(username, gid)
else:
os.setgid(gid)
if uid:
os.setuid(uid)
def _execChild(self, path, uid, gid, executable, args, environment):
"""
The exec() which is done in the forked child.
"""
if path:
os.chdir(path)
if uid is not None or gid is not None:
if uid is None:
uid = os.geteuid()
if gid is None:
gid = os.getegid()
# set the UID before I actually exec the process
os.setuid(0)
os.setgid(0)
switchUID(uid, gid)
os.execvpe(executable, args, environment)
def test_mockSetUid(self):
"""
Try creating a process with setting its uid: it's almost the same path
as the standard path, but with a C{switchUID} call before the exec.
"""
cmd = b'/mock/ouch'
d = defer.Deferred()
p = TrivialProcessProtocol(d)
try:
reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=False, uid=8080)
except SystemError:
self.assertTrue(self.mockos.exited)
self.assertEqual(
self.mockos.actions,
[('fork', False), ('setuid', 0), ('setgid', 0),
('switchuid', 8080, 1234), 'exec', ('exit', 1)])
else:
self.fail("Should not be here")
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name="nobody", gid_name="nogroup"):
if os.getuid() != 0:
# Already not root, take no action
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(077)
def drop_privileges_Arch(uid_name="nobody", gid_name="nobody"):
if os.getuid() != 0:
# Already not root, take no action
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(077)
run_server.py 文件源码
项目:almond-nnparser
作者: Stanford-Mobisocial-IoT-Lab
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def run():
np.random.seed(42)
config = ServerConfig.load(('./server.conf',))
if sys.version_info[2] >= 6:
thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-')
else:
thread_pool = ThreadPoolExecutor(max_workers=32)
app = Application(config, thread_pool)
if config.ssl_key:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key)
app.listen(config.port, ssl_options=ssl_ctx)
else:
app.listen(config.port)
if config.user:
os.setgid(grp.getgrnam(config.user)[2])
os.setuid(pwd.getpwnam(config.user)[2])
if sd:
sd.notify('READY=1')
tokenizer_service = TokenizerService()
tokenizer_service.run()
for language in config.languages:
load_language(app, tokenizer_service, language, config.get_model_directory(language))
sys.stdout.flush()
tornado.ioloop.IOLoop.current().start()
def drop_privileges(user):
'''If running as root, drop process privileges to the given user and user's main group.'''
if os.getuid() == 0:
pwnam = pwd.getpwnam(user)
running_uid, running_gid = (pwnam[2], pwnam[3])
if running_gid != os.getgid():
os.setgid(running_gid)
if running_uid != os.getuid():
os.setuid(running_uid)
def set_user(username):
if username is None:
return
import pwd
import grp
try:
pwrec = pwd.getpwnam(username)
except KeyError:
logging.error('user not found: %s' % username)
raise
user = pwrec[0]
uid = pwrec[2]
gid = pwrec[3]
cur_uid = os.getuid()
if uid == cur_uid:
return
if cur_uid != 0:
logging.error('can not set user as nonroot user')
# will raise later
# inspired by supervisor
if hasattr(os, 'setgroups'):
groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
groups.insert(0, gid)
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def become_user(name):
'''
Change the current process' effective UID to that of the given user name.
Can only be called by super user 0. This function is only intended for use
from the ``init`` process during system boot.
:arg name: An OS user name. Must be found in the ``password`` database, or
a replacement authentication system.
:returns: The user's home directory.
'''
uid = ave.pwd.getpwnam_uid(name)
gid = ave.pwd.getpwnam_gid(name)
if os.geteuid() == uid:
return
if os.geteuid() != 0:
raise Exception('only root can execute with modified privileges')
try:
os.setgid(gid) # must be done before changing euid
os.setuid(uid)
except OSError, e:
if e.errno == errno.EPERM:
raise Exception(
'could not execute with modified privileges: %s' % str(e)
)
return ave.pwd.getpwnam_dir(name)
def become_user(name):
'''
Change the current process' effective UID to that of the given user name.
Can only be called by super user 0. This function is only intended for use
from the ``init`` process during system boot.
:arg name: An OS user name. Must be found in the ``password`` database, or
a replacement authentication system.
:returns: The user's home directory.
'''
uid = ave.pwd.getpwnam_uid(name)
gid = ave.pwd.getpwnam_gid(name)
if os.geteuid() == uid:
return
if os.geteuid() != 0:
raise Exception('only root can execute with modified privileges')
try:
os.setgid(gid) # must be done before changing euid
os.setuid(uid)
except OSError, e:
if e.errno == errno.EPERM:
raise Exception(
'could not execute with modified privileges: %s' % str(e)
)
return ave.pwd.getpwnam_dir(name)
def initgroups(uid, primaryGid):
"""Initializes the group access list.
This is done by reading the group database /etc/group and using all
groups of which C{uid} is a member. The additional group
C{primaryGid} is also added to the list.
If the given user is a member of more than C{NGROUPS}, arbitrary
groups will be silently discarded to bring the number below that
limit.
"""
try:
# Try to get the maximum number of groups
max_groups = os.sysconf("SC_NGROUPS_MAX")
except:
# No predefined limit
max_groups = 0
username = pwd.getpwuid(uid)[0]
l = []
if primaryGid is not None:
l.append(primaryGid)
for groupname, password, gid, userlist in grp.getgrall():
if username in userlist:
l.append(gid)
if len(l) == max_groups:
break # No more groups, ignore any more
try:
_setgroups_until_success(l)
except OSError, e:
# We might be able to remove this code now that we
# don't try to setgid/setuid even when not asked to.
if e.errno == errno.EPERM:
for g in getgroups():
if g not in l:
raise
else:
raise
def switchUID(uid, gid, euid=False):
if euid:
setuid = os.seteuid
setgid = os.setegid
else:
setuid = os.setuid
setgid = os.setgid
if gid is not None:
setgid(gid)
if uid is not None:
initgroups(uid, gid)
setuid(uid)
def contain(command, image_name, image_dir, container_id, container_dir,
cpu_shares, memory, memory_swap, user):
_setup_cpu_cgroup(container_id, cpu_shares)
_setup_memory_cgroup(container_id, memory, memory_swap)
linux.sethostname(container_id) # change hostname to container_id
linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)
new_root = create_container_root(
image_name, image_dir, container_id, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
_create_mounts(new_root)
old_root = os.path.join(new_root, 'old_root')
os.makedirs(old_root)
linux.pivot_root(new_root, old_root)
os.chdir('/')
linux.umount2('/old_root', linux.MNT_DETACH) # umount old root
os.rmdir('/old_root') # rmdir the old_root dir
# TODO: if user is set, drop privileges using os.setuid()
# (and optionally os.setgid()).
os.execvp(command[0], command)
def set_user(username):
if username is None:
return
import pwd
import grp
try:
pwrec = pwd.getpwnam(username)
except KeyError:
logging.error('user not found: %s' % username)
raise
user = pwrec[0]
uid = pwrec[2]
gid = pwrec[3]
cur_uid = os.getuid()
if uid == cur_uid:
return
if cur_uid != 0:
logging.error('can not set user as nonroot user')
# will raise later
# inspired by supervisor
if hasattr(os, 'setgroups'):
groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
groups.insert(0, gid)
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)