def _dropPrivileges(self, user, group):
import pwd, grp
# Get the uid/gid from the name
runningUid = pwd.getpwnam(user).pw_uid
runningGid = grp.getgrnam(group).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(runningGid)
os.setuid(runningUid)
# Reset logging
self.resetLogging()
python类setuid()的实例源码
def drop_privileges(self, uid_name, gid_name):
if os.getuid() != 0:
# We're not root so, like, whatever dude
self.logger.info("Not running as root. Cannot drop permissions.")
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(0o077)
self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def change_process_owner(uid, gid):
""" Change the owning UID and GID of this process.
:param uid: The target UID for the daemon process.
:param gid: The target GID for the daemon process.
:return: ``None``.
Set the GID then the UID of the process (in that order, to avoid
permission errors) to the specified `gid` and `uid` values.
Requires appropriate OS privileges for this process.
"""
try:
os.setgid(gid)
os.setuid(uid)
except Exception as exc:
error = DaemonOSEnvironmentError(
"Unable to change process owner ({exc})".format(exc=exc))
raise error
def sudo(user):
raise NotImplemented
"""
Run your function as the given user
Please note that this *permanently* changes user, you won't be able to change back unless you have
sudo privileges.
Best used inside @background.
"""
user = pwd.getpwnam(user)
print(user)
def decorator(func):
def func_wrapper(*args,**kwargs):
os.setuid(user.pw_uid)
os.setgid(user.pw_gid)
p = func(*args,**kwargs)
return p
return func_wrapper
return decorator
def set_owner_process(uid, gid, initgroups=False):
""" set user and group of workers processes """
if gid:
if uid:
try:
username = get_username(uid)
except KeyError:
initgroups = False
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
if initgroups:
os.initgroups(username, gid)
else:
os.setgid(gid)
if uid:
os.setuid(uid)
def _setuser(user):
''' Normalizes user to a uid and sets the current uid, or does
nothing if user is None.
'''
if user is None:
return
# Normalize group to gid
elif isinstance(user, str):
uid = pwd.getpwnam(user).pw_uid
# The group is already a gid.
else:
uid = user
try:
os.setuid(uid)
except OSError:
self.logger.error('Unable to change user.')
sys.exit(1)
def drop_privileges(uid_name='nobody'):
"""Drop root privileges."""
if os.getuid() != 0:
# We're not root, nothing to do.
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setuid(running_uid)
# Ensure a very conservative umask
os.umask(0o77)
# TODO: probably redundant, as it will not have access to the
# cred cache anyway.
os.environ['KRB5CCNAME'] = 'FILE:/no_such_krbcc'
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name='nobody', gid_name='nogroup'):
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(077)
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name='nobody', gid_name='nobody'):
import os, pwd, grp
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(0o77)
def set_owner_process(uid, gid, initgroups=False):
""" set user and group of workers processes """
if gid:
if uid:
try:
username = get_username(uid)
except KeyError:
initgroups = False
# versions of python < 2.6.2 don't manage unsigned int for
# groups like on osx or fedora
gid = abs(gid) & 0x7FFFFFFF
if initgroups:
os.initgroups(username, gid)
else:
os.setgid(gid)
if uid:
os.setuid(uid)
def _execChild(self, path, uid, gid, executable, args, environment):
"""
The exec() which is done in the forked child.
"""
if path:
os.chdir(path)
if uid is not None or gid is not None:
if uid is None:
uid = os.geteuid()
if gid is None:
gid = os.getegid()
# set the UID before I actually exec the process
os.setuid(0)
os.setgid(0)
switchUID(uid, gid)
os.execvpe(executable, args, environment)
def test_mockSetUid(self):
"""
Try creating a process with setting its uid: it's almost the same path
as the standard path, but with a C{switchUID} call before the exec.
"""
cmd = b'/mock/ouch'
d = defer.Deferred()
p = TrivialProcessProtocol(d)
try:
reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=False, uid=8080)
except SystemError:
self.assertTrue(self.mockos.exited)
self.assertEqual(
self.mockos.actions,
[('fork', False), ('setuid', 0), ('setgid', 0),
('switchuid', 8080, 1234), 'exec', ('exit', 1)])
else:
self.fail("Should not be here")
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def setugid(user):
"""Change process user and group ID
Argument is a numeric user id or a user name"""
try:
from pwd import getpwuid
passwd = getpwuid(int(user))
except ValueError:
from pwd import getpwnam
passwd = getpwnam(user)
if hasattr(os, 'initgroups'): # python >= 2.7
os.initgroups(passwd.pw_name, passwd.pw_gid)
else:
import ctypes
if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0:
err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value
raise OSError(err, os.strerror(err), 'initgroups')
os.setgid(passwd.pw_gid)
os.setuid(passwd.pw_uid)
os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name="nobody", gid_name="nogroup"):
if os.getuid() != 0:
# Already not root, take no action
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(077)
def drop_privileges_Arch(uid_name="nobody", gid_name="nobody"):
if os.getuid() != 0:
# Already not root, take no action
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name).pw_uid
running_gid = grp.getgrnam(gid_name).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(077)
run_server.py 文件源码
项目:almond-nnparser
作者: Stanford-Mobisocial-IoT-Lab
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def run():
np.random.seed(42)
config = ServerConfig.load(('./server.conf',))
if sys.version_info[2] >= 6:
thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-')
else:
thread_pool = ThreadPoolExecutor(max_workers=32)
app = Application(config, thread_pool)
if config.ssl_key:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key)
app.listen(config.port, ssl_options=ssl_ctx)
else:
app.listen(config.port)
if config.user:
os.setgid(grp.getgrnam(config.user)[2])
os.setuid(pwd.getpwnam(config.user)[2])
if sd:
sd.notify('READY=1')
tokenizer_service = TokenizerService()
tokenizer_service.run()
for language in config.languages:
load_language(app, tokenizer_service, language, config.get_model_directory(language))
sys.stdout.flush()
tornado.ioloop.IOLoop.current().start()
def drop_privileges(user):
'''If running as root, drop process privileges to the given user and user's main group.'''
if os.getuid() == 0:
pwnam = pwd.getpwnam(user)
running_uid, running_gid = (pwnam[2], pwnam[3])
if running_gid != os.getgid():
os.setgid(running_gid)
if running_uid != os.getuid():
os.setuid(running_uid)
def _demo_posix():
#
# Example 1: Simple redirection: Get process list
#
plist = Popen(["ps"], stdout=PIPE).communicate()[0]
print "Process list:"
print plist
#
# Example 2: Change uid before executing child
#
if os.getuid() == 0:
p = Popen(["id"], preexec_fn=lambda: os.setuid(100))
p.wait()
#
# Example 3: Connecting several subprocesses
#
print "Looking for 'hda'..."
p1 = Popen(["dmesg"], stdout=PIPE)
p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE)
print repr(p2.communicate()[0])
#
# Example 4: Catch execution error
#
print
print "Trying a weird file..."
try:
print Popen(["/this/path/does/not/exist"]).communicate()
except OSError, e:
if e.errno == errno.ENOENT:
print "The file didn't exist. I thought so..."
print "Child traceback:"
print e.child_traceback
else:
print "Error", e.errno
else:
print >>sys.stderr, "Gosh. No error."
def set_user(username):
if username is None:
return
import pwd
import grp
try:
pwrec = pwd.getpwnam(username)
except KeyError:
logging.error('user not found: %s' % username)
raise
user = pwrec[0]
uid = pwrec[2]
gid = pwrec[3]
cur_uid = os.getuid()
if uid == cur_uid:
return
if cur_uid != 0:
logging.error('can not set user as nonroot user')
# will raise later
# inspired by supervisor
if hasattr(os, 'setgroups'):
groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
groups.insert(0, gid)
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def _run_as_user(user, gid=None):
try:
user = pwd.getpwnam(user)
except KeyError:
log('Invalid user: %s' % user)
raise Exception
uid = user.pw_uid
gid = gid or user.pw_gid
os.environ['HOME'] = user.pw_dir
def _inner():
os.setgid(gid)
os.setuid(uid)
return _inner
def become_user(name):
'''
Change the current process' effective UID to that of the given user name.
Can only be called by super user 0. This function is only intended for use
from the ``init`` process during system boot.
:arg name: An OS user name. Must be found in the ``password`` database, or
a replacement authentication system.
:returns: The user's home directory.
'''
uid = ave.pwd.getpwnam_uid(name)
gid = ave.pwd.getpwnam_gid(name)
if os.geteuid() == uid:
return
if os.geteuid() != 0:
raise Exception('only root can execute with modified privileges')
try:
os.setgid(gid) # must be done before changing euid
os.setuid(uid)
except OSError, e:
if e.errno == errno.EPERM:
raise Exception(
'could not execute with modified privileges: %s' % str(e)
)
return ave.pwd.getpwnam_dir(name)
def become_user(name):
'''
Change the current process' effective UID to that of the given user name.
Can only be called by super user 0. This function is only intended for use
from the ``init`` process during system boot.
:arg name: An OS user name. Must be found in the ``password`` database, or
a replacement authentication system.
:returns: The user's home directory.
'''
uid = ave.pwd.getpwnam_uid(name)
gid = ave.pwd.getpwnam_gid(name)
if os.geteuid() == uid:
return
if os.geteuid() != 0:
raise Exception('only root can execute with modified privileges')
try:
os.setgid(gid) # must be done before changing euid
os.setuid(uid)
except OSError, e:
if e.errno == errno.EPERM:
raise Exception(
'could not execute with modified privileges: %s' % str(e)
)
return ave.pwd.getpwnam_dir(name)
def _check_pid(self):
# Lame fork detection to remind developers to invoke Random.atfork()
# after every call to os.fork(). Note that this check is not reliable,
# since process IDs can be reused on most operating systems.
#
# You need to do Random.atfork() in the child process after every call
# to os.fork() to avoid reusing PRNG state. If you want to avoid
# leaking PRNG state to child processes (for example, if you are using
# os.setuid()) then you should also invoke Random.atfork() in the
# *parent* process.
if os.getpid() != self._pid:
raise AssertionError("PID check failed. RNG must be re-initialized after fork(). Hint: Try Random.atfork()")
def initgroups(uid, primaryGid):
"""Initializes the group access list.
This is done by reading the group database /etc/group and using all
groups of which C{uid} is a member. The additional group
C{primaryGid} is also added to the list.
If the given user is a member of more than C{NGROUPS}, arbitrary
groups will be silently discarded to bring the number below that
limit.
"""
try:
# Try to get the maximum number of groups
max_groups = os.sysconf("SC_NGROUPS_MAX")
except:
# No predefined limit
max_groups = 0
username = pwd.getpwuid(uid)[0]
l = []
if primaryGid is not None:
l.append(primaryGid)
for groupname, password, gid, userlist in grp.getgrall():
if username in userlist:
l.append(gid)
if len(l) == max_groups:
break # No more groups, ignore any more
try:
_setgroups_until_success(l)
except OSError, e:
# We might be able to remove this code now that we
# don't try to setgid/setuid even when not asked to.
if e.errno == errno.EPERM:
for g in getgroups():
if g not in l:
raise
else:
raise