def change_root_directory(directory):
""" Change the root directory of this process.
:param directory: The target directory path.
:return: ``None``.
Set the current working directory, then the process root directory,
to the specified `directory`. Requires appropriate OS privileges
for this process.
"""
try:
os.chdir(directory)
os.chroot(directory)
except Exception as exc:
error = DaemonOSEnvironmentError(
"Unable to change root directory ({exc})".format(exc=exc))
raise error
python类chroot()的实例源码
def orphansKill(rootToKill, killsig=signal.SIGTERM):
"""kill off anything that is still chrooted."""
getLog().debug("kill orphans")
if USE_NSPAWN is False:
for fn in [d for d in os.listdir("/proc") if d.isdigit()]:
try:
root = os.readlink("/proc/%s/root" % fn)
if os.path.realpath(root) == os.path.realpath(rootToKill):
getLog().warning("Process ID %s still running in chroot. Killing...", fn)
pid = int(fn, 10)
os.kill(pid, killsig)
os.waitpid(pid, 0)
except OSError:
pass
else:
m_uuid = get_machinectl_uuid(rootToKill)
if m_uuid:
getLog().warning("Machine %s still running. Killing...", m_uuid)
os.system("/usr/bin/machinectl terminate %s" % m_uuid)
def create_patched_packages(queue):
"""
Patches the given package using rpmrebuild and the patching root.
@param root The root to be used.
"""
root = queue.get()
logging.debug("Chrooting to {0}".format(root))
os.chroot(root)
logging.debug("Chrooting to {0} done.".format(root))
os.chdir("/")
if not os.path.isfile("/Makefile"):
logging.info("Chroot has no jobs to perform.")
queue.task_done()
return
make_command = ["make"]
if not hidden_subprocess.visible_mode:
make_command.append("--silent")
subprocess.call(make_command)
logging.debug("Exiting from {0}".format(root))
queue.task_done()
def __unpack_qemu_packages(self):
"""
Looks for all qemu packages in the given list of repositories and
unpacks them to the given directory.
"""
initial_directory = os.getcwd()
qemu_packages = []
qemu_package = self.qemu_path
if qemu_package is None:
expression = "^qemu.*\.{0}\.rpm$".format(self.architecture)
for repository in self.repositories:
qemu_packages_portion = files.find_fast(repository, expression)
qemu_packages.extend(qemu_packages_portion)
logging.warning("The following qemu packages will be unpacked in "
"chroot:")
for package in qemu_packages:
logging.warning(" * {0}".format(package))
else:
qemu_packages.append(qemu_package)
for package in qemu_packages:
files.unrpm(package, self.patching_root)
def setupEnvironment(config):
if config['chroot'] is not None:
os.chroot(config['chroot'])
if config['rundir'] == '.':
config['rundir'] = '/'
os.chdir(config['rundir'])
if not config['nodaemon']:
daemonize()
if config['pidfile']:
open(config['pidfile'],'wb').write(str(os.getpid()))
def contain(command, image_name, image_dir, container_id, container_dir):
linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace
# TODO: we added MS_REC here. wanna guess why?
linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)
new_root = create_container_root(
image_name, image_dir, container_id, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
# Create mounts (/proc, /sys, /dev) under new_root
linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '')
linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '')
linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs',
linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755')
# Add some basic devices
devpts_path = os.path.join(new_root, 'dev', 'pts')
if not os.path.exists(devpts_path):
os.makedirs(devpts_path)
linux.mount('devpts', devpts_path, 'devpts', 0, '')
makedev(os.path.join(new_root, 'dev'))
os.chroot(new_root) # TODO: replace with pivot_root
os.chdir('/')
# TODO: umount2 old root (HINT: see MNT_DETACH in man 2 umount)
os.execvp(command[0], command)
def contain(command, image_name, image_dir, container_id, container_dir):
new_root = create_container_root(
image_name, image_dir, container_id, container_dir)
print('Created a new root fs for our container: {}'.format(new_root))
# TODO: time to say goodbye to the old mount namespace,
# see "man 2 unshare" to get some help
# HINT 1: there is no os.unshare(), time to use the linux module we made
# just for you!
# HINT 2: the linux module includes both functions and constants!
# e.g. linux.CLONE_NEWNS
# TODO: remember shared subtrees?
# (https://www.kernel.org/doc/Documentation/filesystems/sharedsubtree.txt)
# Make / a private mount to avoid littering our host mount table.
# Create mounts (/proc, /sys, /dev) under new_root
linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '')
linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '')
linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs',
linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755')
# Add some basic devices
devpts_path = os.path.join(new_root, 'dev', 'pts')
if not os.path.exists(devpts_path):
os.makedirs(devpts_path)
linux.mount('devpts', devpts_path, 'devpts', 0, '')
for i, dev in enumerate(['stdin', 'stdout', 'stderr']):
os.symlink('/proc/self/fd/%d' % i, os.path.join(new_root, 'dev', dev))
# TODO: add more devices (e.g. null, zero, random, urandom) using os.mknod.
os.chroot(new_root)
os.chdir('/')
os.execvp(command[0], command)
test_serialize_safe.py 文件源码
项目:Software-Architecture-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def system_jail():
""" A simple chroot jail """
os.chroot('safe_root/')
yield
os.chroot('/')
def chroot(self):
""" .. seealso:: :func:`os.chroot` """
os.chroot(self)
def condChroot(chrootPath):
if chrootPath is not None:
saved = {"ruid": os.getuid(), "euid": os.geteuid()}
setresuid(0, 0, 0)
os.chdir(chrootPath)
os.chroot(chrootPath)
setresuid(saved['ruid'], saved['euid'])
def _prepare_nspawn_command(chrootPath, user, cmd, nspawn_args=None, env=None, cwd=None):
cmd_is_list = isinstance(cmd, list)
if nspawn_args is None:
nspawn_args = []
if user:
# user can be either id or name
if cmd_is_list:
cmd = ['-u', str(user)] + cmd
else:
raise exception.Error('Internal Error: command must be list or shell=True.')
elif not cmd_is_list:
cmd = [cmd]
nspawn_argv = ['/usr/bin/systemd-nspawn', '-q', '-M', uuid.uuid4().hex, '-D', chrootPath]
distro_label = distro.linux_distribution(full_distribution_name=False)[0]
if (distro_label != 'centos') and (distro_label != 'ol') and (distro_label != 'rhel') and (distro_label != 'deskos'):
# EL7 does not support it (yet). See BZ 1417387
nspawn_argv += ['-a']
nspawn_argv.extend(nspawn_args)
if cwd:
nspawn_argv.append('--chdir={0}'.format(cwd))
if env:
# BZ 1312384 workaround
env['PROMPT_COMMAND'] = r'printf "\033]0;<mock-chroot>\007"'
env['PS1'] = r'<mock-chroot> \s-\v\$ '
for k, v in env.items():
nspawn_argv.append('--setenv={0}={1}'.format(k, v))
cmd = nspawn_argv + cmd
if cmd_is_list:
return cmd
else:
return " ".join(cmd)
def doshell(chrootPath=None, environ=None, uid=None, gid=None, cmd=None,
nspawn_args=None,
unshare_ipc=True,
unshare_net=False):
log = getLog()
log.debug("doshell: chrootPath:%s, uid:%d, gid:%d", chrootPath, uid, gid)
if environ is None:
environ = clean_env()
if 'PROMPT_COMMAND' not in environ:
environ['PROMPT_COMMAND'] = r'printf "\033]0;<mock-chroot>\007"'
if 'PS1' not in environ:
environ['PS1'] = r'<mock-chroot> \s-\v\$ '
if 'SHELL' not in environ:
environ['SHELL'] = '/bin/sh'
log.debug("doshell environment: %s", environ)
if cmd:
if not isinstance(cmd, list):
cmd = [cmd]
cmd = ['/bin/sh', '-c'] + cmd
else:
cmd = ["/bin/sh", "-i", "-l"]
if USE_NSPAWN:
# nspawn cannot set gid
cmd = _prepare_nspawn_command(chrootPath, uid, cmd, nspawn_args=nspawn_args, env=environ)
preexec = ChildPreExec(personality=None, chrootPath=chrootPath, cwd=None,
uid=uid, gid=gid, env=environ, shell=True,
unshare_ipc=unshare_ipc, unshare_net=unshare_net)
log.debug("doshell: command: %s", cmd)
return subprocess.call(cmd, preexec_fn=preexec, env=environ, shell=False)
def do_update_config(log, config_opts, cfg, uidManager, name, skipError=True):
if os.path.exists(cfg):
config_opts['config_paths'].append(cfg)
update_config_from_file(config_opts, cfg, uidManager)
check_macro_definition(config_opts)
elif not skipError:
log.error("Could not find required config file: %s", cfg)
if name == "default":
log.error(" Did you forget to specify the chroot to use with '-r'?")
if "/" in cfg:
log.error(" If you're trying to specify a path, include the .cfg extension, e.g. -r ./target.cfg")
sys.exit(1)
def setupEnvironment(config):
if config['chroot'] is not None:
os.chroot(config['chroot'])
if config['rundir'] == '.':
config['rundir'] = '/'
os.chdir(config['rundir'])
if not config['nodaemon']:
daemonize()
if config['pidfile']:
open(config['pidfile'],'wb').write(str(os.getpid()))
def umount(self):
try:
subprocess.check_call(['umount', os.path.join(self.chroot, 'proc')])
except subprocess.CalledProcessError:
pass
def mount(self):
try:
mount_point = os.path.join(self.chroot, 'proc')
if not os.path.ismount(mount_point):
subprocess.check_call(['mount', '-t', 'proc', 'none', mount_point])
except subprocess.CalledProcessError:
pass
def __init__(self, path):
self.chroot = path
self.orig_fd = os.open("/", os.O_RDONLY)
self.chroot_fd = os.open(self.chroot, os.O_RDONLY)
def __enter__(self):
self.mount()
os.chroot(self.chroot)
os.fchdir(self.chroot_fd)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
os.fchdir(self.orig_fd)
os.chroot(".")
os.close(self.orig_fd)
os.close(self.chroot_fd)
self.umount()
def get_inside_path(self, path):
return path.replace(self.chroot, "")
def chroot(self):
""" .. seealso:: :func:`os.chroot` """
os.chroot(self)
def enterChroot(path):
os.chdir(path)
os.chroot(path)
def executeCommand(command):
cwd = os.getcwd()
rr = os.open("/", os.O_RDONLY)
os.chroot(getRootDir.getEnvsDir() + getEnvName())
os.chdir("/")
os.system(command)
os.fchdir(rr)
os.chroot(".")
os.chdir(cwd)
def __install_rpmrebuild(self, queue):
"""
Chroots to the given path and installs rpmrebuild in it.
@param queue The queue where the result will be put.
"""
os.chroot(self.patching_root)
os.chdir("/")
os.chdir("/rpmrebuild/src")
hidden_subprocess.call("Making the rpmrebuild.", ["make"])
hidden_subprocess.call("Installing the rpmrebuild.",
["make", "install"])
if not check.command_exists("rpmrebuild"):
sys.exit("Error.")
queue.put(True)
def __deploy_packages(self):
"""
Deploys packages to chroot clones and generates makefiles for them.
"""
self._tasks.sort(key=lambda task: os.stat(task[1]).st_size)
copy_tasks = []
for i in range(repository_combiner.jobs_number):
tasks = []
i_task = i
while i_task < len(self._tasks):
tasks.append(self._tasks[i_task])
i_task += repository_combiner.jobs_number
if len(tasks) == 0:
continue
directories = {}
for task in tasks:
package_name, package_path, target, _, _ = task
copy_tasks.append((package_name, package_path,
self.patching_root_clones[i]))
self._targets[package_name] = target
basename = os.path.basename(target)
self._package_names[basename] = package_name
self._generate_makefile(self.patching_root_clones[i], tasks)
hidden_subprocess.function_call_list(
"Copying to patcher", shutil.copy, copy_tasks)
def __use_cached_root_or_prepare(self):
"""
Tries to find cached root and uses it in case it exists and prepares
it otherwise.
"""
image_info = "{0}".format((self.names, self.repositories,
self.architecture,
os.path.basename(self.kickstart_file_path)))
cached_images_info_paths = files.find_fast(
patching_cache_path, ".*preliminary_image.info.txt")
matching_images_path = None
for info_path in cached_images_info_paths:
cached_images_path = info_path.replace(".info.txt", "")
if not os.path.isdir(cached_images_path):
logging.error("Directory {0} not "
"found!".format(cached_images_path))
continue
lines = []
with open(info_path, "r") as info_file:
for line in info_file:
lines.append(line)
if lines[0] == image_info:
matching_images_path = cached_images_path
break
if matching_images_path is not None:
self.patching_root = matching_images_path
logging.info("Found already prepared patching root: "
"{0}".format(matching_images_path))
else:
self.__prepare()
cached_chroot_path = os.path.join(
patching_cache_path, os.path.basename(
self.patching_root) + "preliminary_image")
hidden_subprocess.call(
"Saving chroot to cache",
["cp", "-a", self.patching_root, cached_chroot_path])
info_path = cached_chroot_path + ".info.txt"
with open(info_path, "wb") as info_file:
info_file.write(image_info)
def copy_boot(self):
cluster = Cluster(mongo_db = self._mongo_db)
image_path = str(self.get('path'))
kernver = str(self.get('kernver'))
tmp_path = '/tmp' # in chroot env
initrdfile = str(self.name) + '-initramfs-' + kernver
kernfile = str(self.name) + '-vmlinuz-' + kernver
path = cluster.get('path')
if not path:
self._logger.error("Path needs to be configured.")
return None
path = str(path)
user = cluster.get('user')
if not user:
self._logger.error("User needs to be configured.")
return None
path_to_store = path + "/boot"
user_id = pwd.getpwnam(user).pw_uid
grp_id = pwd.getpwnam(user).pw_gid
if not os.path.exists(path_to_store):
os.makedirs(path_to_store)
os.chown(path_to_store, user_id, grp_id)
shutil.copy(image_path + '/boot/initramfs-' + kernver + '.img', path_to_store + '/' + initrdfile)
shutil.copy(image_path + '/boot/vmlinuz-' + kernver, path_to_store + '/' + kernfile)
os.chown(path_to_store + '/' + initrdfile, user_id, grp_id)
os.chmod(path_to_store + '/' + initrdfile, 0644)
os.chown(path_to_store + '/' + kernfile, user_id, grp_id)
os.chmod(path_to_store + '/' + kernfile, 0644)
self.set('kernfile', kernfile)
self.set('initrdfile', initrdfile)
self._logger.warning("Boot files was copied, but luna module might not being added to initrd. Please check /etc/dracut.conf.d in image")
return True
def setupEnvironment(self, chroot, rundir, nodaemon, umask, pidfile):
"""
Set the filesystem root, the working directory, and daemonize.
@type chroot: C{str} or L{None}
@param chroot: If not None, a path to use as the filesystem root (using
L{os.chroot}).
@type rundir: C{str}
@param rundir: The path to set as the working directory.
@type nodaemon: C{bool}
@param nodaemon: A flag which, if set, indicates that daemonization
should not be done.
@type umask: C{int} or L{None}
@param umask: The value to which to change the process umask.
@type pidfile: C{str} or L{None}
@param pidfile: If not L{None}, the path to a file into which to put
the PID of this process.
"""
daemon = not nodaemon
if chroot is not None:
os.chroot(chroot)
if rundir == '.':
rundir = '/'
os.chdir(rundir)
if daemon and umask is None:
umask = 0o077
if umask is not None:
os.umask(umask)
if daemon:
from twisted.internet import reactor
self.config["statusPipe"] = self.daemonize(reactor)
if pidfile:
with open(pidfile, 'wb') as f:
f.write(intToBytes(os.getpid()))
def startApplication(self, application):
"""
Configure global process state based on the given application and run
the application.
@param application: An object which can be adapted to
L{service.IProcess} and L{service.IService}.
"""
process = service.IProcess(application)
if not self.config['originalname']:
launchWithName(process.processName)
self.setupEnvironment(
self.config['chroot'], self.config['rundir'],
self.config['nodaemon'], self.config['umask'],
self.config['pidfile'])
service.IService(application).privilegedStartService()
uid, gid = self.config['uid'], self.config['gid']
if uid is None:
uid = process.uid
if gid is None:
gid = process.gid
self.shedPrivileges(self.config['euid'], uid, gid)
app.startApplication(application, not self.config['no_save'])
def setUp(self):
self.root = self.unset
self.cwd = self.unset
self.mask = self.unset
self.daemon = False
self.pid = os.getpid()
self.patch(os, 'chroot', lambda path: setattr(self, 'root', path))
self.patch(os, 'chdir', lambda path: setattr(self, 'cwd', path))
self.patch(os, 'umask', lambda mask: setattr(self, 'mask', mask))
self.runner = UnixApplicationRunner(twistd.ServerOptions())
self.runner.daemonize = self.daemonize