python类chroot()的实例源码

daemon.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def change_root_directory(directory):
    """ Change the root directory of this process.

        :param directory: The target directory path.
        :return: ``None``.

        Set the current working directory, then the process root directory,
        to the specified `directory`. Requires appropriate OS privileges
        for this process.

        """
    try:
        os.chdir(directory)
        os.chroot(directory)
    except Exception as exc:
        error = DaemonOSEnvironmentError(
            "Unable to change root directory ({exc})".format(exc=exc))
        raise error
util.py 文件源码 项目:mock 作者: rpm-software-management 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def orphansKill(rootToKill, killsig=signal.SIGTERM):
    """kill off anything that is still chrooted."""
    getLog().debug("kill orphans")
    if USE_NSPAWN is False:
        for fn in [d for d in os.listdir("/proc") if d.isdigit()]:
            try:
                root = os.readlink("/proc/%s/root" % fn)
                if os.path.realpath(root) == os.path.realpath(rootToKill):
                    getLog().warning("Process ID %s still running in chroot. Killing...", fn)
                    pid = int(fn, 10)
                    os.kill(pid, killsig)
                    os.waitpid(pid, 0)
            except OSError:
                pass
    else:
        m_uuid = get_machinectl_uuid(rootToKill)
        if m_uuid:
            getLog().warning("Machine %s still running. Killing...", m_uuid)
            os.system("/usr/bin/machinectl terminate %s" % m_uuid)
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_patched_packages(queue):
    """
    Patches the given package using rpmrebuild and the patching root.

    @param root             The root to be used.
    """
    root = queue.get()
    logging.debug("Chrooting to {0}".format(root))
    os.chroot(root)
    logging.debug("Chrooting to {0} done.".format(root))
    os.chdir("/")
    if not os.path.isfile("/Makefile"):
        logging.info("Chroot has no jobs to perform.")
        queue.task_done()
        return

    make_command = ["make"]
    if not hidden_subprocess.visible_mode:
        make_command.append("--silent")
    subprocess.call(make_command)
    logging.debug("Exiting from {0}".format(root))
    queue.task_done()
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __unpack_qemu_packages(self):
        """
        Looks for all qemu packages in the given list of repositories and
        unpacks them to the given directory.
        """
        initial_directory = os.getcwd()
        qemu_packages = []

        qemu_package = self.qemu_path
        if qemu_package is None:
            expression = "^qemu.*\.{0}\.rpm$".format(self.architecture)
            for repository in self.repositories:
                qemu_packages_portion = files.find_fast(repository, expression)
                qemu_packages.extend(qemu_packages_portion)
            logging.warning("The following qemu packages will be unpacked in "
                            "chroot:")
            for package in qemu_packages:
                logging.warning(" * {0}".format(package))
        else:
            qemu_packages.append(qemu_package)

        for package in qemu_packages:
            files.unrpm(package, self.patching_root)
_twistd_unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setupEnvironment(config):
    if config['chroot'] is not None:
        os.chroot(config['chroot'])
        if config['rundir'] == '.':
            config['rundir'] = '/'
    os.chdir(config['rundir'])
    if not config['nodaemon']:
        daemonize()
    if config['pidfile']:
        open(config['pidfile'],'wb').write(str(os.getpid()))
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    linux.unshare(linux.CLONE_NEWNS)  # create a new mount namespace

    # TODO: we added MS_REC here. wanna guess why?
    linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None)

    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    # Create mounts (/proc, /sys, /dev) under new_root
    linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '')
    linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '')
    linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs',
                linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755')

    # Add some basic devices
    devpts_path = os.path.join(new_root, 'dev', 'pts')
    if not os.path.exists(devpts_path):
        os.makedirs(devpts_path)
        linux.mount('devpts', devpts_path, 'devpts', 0, '')

    makedev(os.path.join(new_root, 'dev'))

    os.chroot(new_root)  # TODO: replace with pivot_root

    os.chdir('/')

    # TODO: umount2 old root (HINT: see MNT_DETACH in man 2 umount)

    os.execvp(command[0], command)
rd.py 文件源码 项目:rubber-docker 作者: Fewbytes 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def contain(command, image_name, image_dir, container_id, container_dir):
    new_root = create_container_root(
        image_name, image_dir, container_id, container_dir)
    print('Created a new root fs for our container: {}'.format(new_root))

    # TODO: time to say goodbye to the old mount namespace,
    #       see "man 2 unshare" to get some help
    #   HINT 1: there is no os.unshare(), time to use the linux module we made
    #           just for you!
    #   HINT 2: the linux module includes both functions and constants!
    #           e.g. linux.CLONE_NEWNS

    # TODO: remember shared subtrees?
    # (https://www.kernel.org/doc/Documentation/filesystems/sharedsubtree.txt)
    # Make / a private mount to avoid littering our host mount table.

    # Create mounts (/proc, /sys, /dev) under new_root
    linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '')
    linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '')
    linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs',
                linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755')
    # Add some basic devices
    devpts_path = os.path.join(new_root, 'dev', 'pts')
    if not os.path.exists(devpts_path):
        os.makedirs(devpts_path)
        linux.mount('devpts', devpts_path, 'devpts', 0, '')
    for i, dev in enumerate(['stdin', 'stdout', 'stderr']):
        os.symlink('/proc/self/fd/%d' % i, os.path.join(new_root, 'dev', dev))

    # TODO: add more devices (e.g. null, zero, random, urandom) using os.mknod.

    os.chroot(new_root)

    os.chdir('/')

    os.execvp(command[0], command)
test_serialize_safe.py 文件源码 项目:Software-Architecture-with-Python 作者: PacktPublishing 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def system_jail():
    """ A simple chroot jail """

    os.chroot('safe_root/')
    yield
    os.chroot('/')
path.py 文件源码 项目:click-configfile 作者: click-contrib 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def chroot(self):
            """ .. seealso:: :func:`os.chroot` """
            os.chroot(self)
util.py 文件源码 项目:mock 作者: rpm-software-management 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def condChroot(chrootPath):
    if chrootPath is not None:
        saved = {"ruid": os.getuid(), "euid": os.geteuid()}
        setresuid(0, 0, 0)
        os.chdir(chrootPath)
        os.chroot(chrootPath)
        setresuid(saved['ruid'], saved['euid'])
util.py 文件源码 项目:mock 作者: rpm-software-management 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _prepare_nspawn_command(chrootPath, user, cmd, nspawn_args=None, env=None, cwd=None):
    cmd_is_list = isinstance(cmd, list)
    if nspawn_args is None:
        nspawn_args = []
    if user:
        # user can be either id or name
        if cmd_is_list:
            cmd = ['-u', str(user)] + cmd
        else:
            raise exception.Error('Internal Error: command must be list or shell=True.')
    elif not cmd_is_list:
        cmd = [cmd]
    nspawn_argv = ['/usr/bin/systemd-nspawn', '-q', '-M', uuid.uuid4().hex, '-D', chrootPath]
    distro_label = distro.linux_distribution(full_distribution_name=False)[0]
    if (distro_label != 'centos') and (distro_label != 'ol') and (distro_label != 'rhel') and (distro_label != 'deskos'):
        # EL7 does not support it (yet). See BZ 1417387
        nspawn_argv += ['-a']
    nspawn_argv.extend(nspawn_args)
    if cwd:
        nspawn_argv.append('--chdir={0}'.format(cwd))
    if env:
        # BZ 1312384 workaround
        env['PROMPT_COMMAND'] = r'printf "\033]0;<mock-chroot>\007"'
        env['PS1'] = r'<mock-chroot> \s-\v\$ '
        for k, v in env.items():
            nspawn_argv.append('--setenv={0}={1}'.format(k, v))
    cmd = nspawn_argv + cmd
    if cmd_is_list:
        return cmd
    else:
        return " ".join(cmd)
util.py 文件源码 项目:mock 作者: rpm-software-management 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def doshell(chrootPath=None, environ=None, uid=None, gid=None, cmd=None,
            nspawn_args=None,
            unshare_ipc=True,
            unshare_net=False):
    log = getLog()
    log.debug("doshell: chrootPath:%s, uid:%d, gid:%d", chrootPath, uid, gid)
    if environ is None:
        environ = clean_env()
    if 'PROMPT_COMMAND' not in environ:
        environ['PROMPT_COMMAND'] = r'printf "\033]0;<mock-chroot>\007"'
    if 'PS1' not in environ:
        environ['PS1'] = r'<mock-chroot> \s-\v\$ '
    if 'SHELL' not in environ:
        environ['SHELL'] = '/bin/sh'
    log.debug("doshell environment: %s", environ)
    if cmd:
        if not isinstance(cmd, list):
            cmd = [cmd]
        cmd = ['/bin/sh', '-c'] + cmd
    else:
        cmd = ["/bin/sh", "-i", "-l"]
    if USE_NSPAWN:
        # nspawn cannot set gid
        cmd = _prepare_nspawn_command(chrootPath, uid, cmd, nspawn_args=nspawn_args, env=environ)
    preexec = ChildPreExec(personality=None, chrootPath=chrootPath, cwd=None,
                           uid=uid, gid=gid, env=environ, shell=True,
                           unshare_ipc=unshare_ipc, unshare_net=unshare_net)
    log.debug("doshell: command: %s", cmd)
    return subprocess.call(cmd, preexec_fn=preexec, env=environ, shell=False)
util.py 文件源码 项目:mock 作者: rpm-software-management 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_update_config(log, config_opts, cfg, uidManager, name, skipError=True):
    if os.path.exists(cfg):
        config_opts['config_paths'].append(cfg)
        update_config_from_file(config_opts, cfg, uidManager)
        check_macro_definition(config_opts)
    elif not skipError:
        log.error("Could not find required config file: %s", cfg)
        if name == "default":
            log.error("  Did you forget to specify the chroot to use with '-r'?")
        if "/" in cfg:
            log.error("  If you're trying to specify a path, include the .cfg extension, e.g. -r ./target.cfg")
        sys.exit(1)
_twistd_unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setupEnvironment(config):
    if config['chroot'] is not None:
        os.chroot(config['chroot'])
        if config['rundir'] == '.':
            config['rundir'] = '/'
    os.chdir(config['rundir'])
    if not config['nodaemon']:
        daemonize()
    if config['pidfile']:
        open(config['pidfile'],'wb').write(str(os.getpid()))
chroot.py 文件源码 项目:pkgscripts-ng 作者: SynologyOpenSource 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def umount(self):
        try:
            subprocess.check_call(['umount', os.path.join(self.chroot, 'proc')])
        except subprocess.CalledProcessError:
            pass
chroot.py 文件源码 项目:pkgscripts-ng 作者: SynologyOpenSource 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def mount(self):
        try:
            mount_point = os.path.join(self.chroot, 'proc')
            if not os.path.ismount(mount_point):
                subprocess.check_call(['mount', '-t', 'proc', 'none', mount_point])
        except subprocess.CalledProcessError:
            pass
chroot.py 文件源码 项目:pkgscripts-ng 作者: SynologyOpenSource 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, path):
        self.chroot = path
        self.orig_fd = os.open("/", os.O_RDONLY)
        self.chroot_fd = os.open(self.chroot, os.O_RDONLY)
chroot.py 文件源码 项目:pkgscripts-ng 作者: SynologyOpenSource 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.mount()
        os.chroot(self.chroot)
        os.fchdir(self.chroot_fd)
        return self
chroot.py 文件源码 项目:pkgscripts-ng 作者: SynologyOpenSource 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_val, exc_tb):
        os.fchdir(self.orig_fd)
        os.chroot(".")
        os.close(self.orig_fd)
        os.close(self.chroot_fd)
        self.umount()
chroot.py 文件源码 项目:pkgscripts-ng 作者: SynologyOpenSource 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_inside_path(self, path):
        return path.replace(self.chroot, "")
path.py 文件源码 项目:click-configfile 作者: jenisys 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def chroot(self):
            """ .. seealso:: :func:`os.chroot` """
            os.chroot(self)
enter.py 文件源码 项目:Coffer 作者: Max00355 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def enterChroot(path):
    os.chdir(path)
    os.chroot(path)
templateUtils.py 文件源码 项目:Coffer 作者: Max00355 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def executeCommand(command):
    cwd = os.getcwd()
    rr = os.open("/", os.O_RDONLY)
    os.chroot(getRootDir.getEnvsDir() + getEnvName())
    os.chdir("/")
    os.system(command)
    os.fchdir(rr)
    os.chroot(".")
    os.chdir(cwd)
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __install_rpmrebuild(self, queue):
        """
        Chroots to the given path and installs rpmrebuild in it.

        @param queue    The queue where the result will be put.
        """
        os.chroot(self.patching_root)
        os.chdir("/")
        os.chdir("/rpmrebuild/src")
        hidden_subprocess.call("Making the rpmrebuild.", ["make"])
        hidden_subprocess.call("Installing the rpmrebuild.",
                               ["make", "install"])
        if not check.command_exists("rpmrebuild"):
            sys.exit("Error.")
        queue.put(True)
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __deploy_packages(self):
        """
        Deploys packages to chroot clones and generates makefiles for them.
        """
        self._tasks.sort(key=lambda task: os.stat(task[1]).st_size)
        copy_tasks = []
        for i in range(repository_combiner.jobs_number):
            tasks = []
            i_task = i
            while i_task < len(self._tasks):
                tasks.append(self._tasks[i_task])
                i_task += repository_combiner.jobs_number

            if len(tasks) == 0:
                continue

            directories = {}
            for task in tasks:
                package_name, package_path, target, _, _ = task
                copy_tasks.append((package_name, package_path,
                                   self.patching_root_clones[i]))
                self._targets[package_name] = target
                basename = os.path.basename(target)
                self._package_names[basename] = package_name
            self._generate_makefile(self.patching_root_clones[i], tasks)
        hidden_subprocess.function_call_list(
            "Copying to patcher", shutil.copy, copy_tasks)
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __use_cached_root_or_prepare(self):
        """
        Tries to find cached root and uses it in case it exists and prepares
        it otherwise.
        """
        image_info = "{0}".format((self.names, self.repositories,
                                   self.architecture,
                                   os.path.basename(self.kickstart_file_path)))
        cached_images_info_paths = files.find_fast(
            patching_cache_path, ".*preliminary_image.info.txt")
        matching_images_path = None
        for info_path in cached_images_info_paths:
            cached_images_path = info_path.replace(".info.txt", "")
            if not os.path.isdir(cached_images_path):
                logging.error("Directory {0} not "
                              "found!".format(cached_images_path))
                continue
            lines = []
            with open(info_path, "r") as info_file:
                for line in info_file:
                    lines.append(line)
            if lines[0] == image_info:
                matching_images_path = cached_images_path
                break
        if matching_images_path is not None:
            self.patching_root = matching_images_path
            logging.info("Found already prepared patching root: "
                         "{0}".format(matching_images_path))
        else:
            self.__prepare()
            cached_chroot_path = os.path.join(
                patching_cache_path, os.path.basename(
                    self.patching_root) + "preliminary_image")
            hidden_subprocess.call(
                "Saving chroot to cache",
                ["cp", "-a", self.patching_root, cached_chroot_path])
            info_path = cached_chroot_path + ".info.txt"
            with open(info_path, "wb") as info_file:
                info_file.write(image_info)
osimage.py 文件源码 项目:luna 作者: dchirikov 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def copy_boot(self):
        cluster = Cluster(mongo_db = self._mongo_db)
        image_path = str(self.get('path'))
        kernver = str(self.get('kernver'))
        tmp_path = '/tmp' # in chroot env
        initrdfile = str(self.name) + '-initramfs-' + kernver
        kernfile = str(self.name) + '-vmlinuz-' + kernver
        path = cluster.get('path')
        if not path:
            self._logger.error("Path needs to be configured.")
            return None
        path = str(path)
        user = cluster.get('user')
        if not user:
            self._logger.error("User needs to be configured.")
            return None
        path_to_store = path + "/boot"
        user_id = pwd.getpwnam(user).pw_uid
        grp_id = pwd.getpwnam(user).pw_gid
        if not os.path.exists(path_to_store):
            os.makedirs(path_to_store)
            os.chown(path_to_store, user_id, grp_id)
        shutil.copy(image_path + '/boot/initramfs-' + kernver + '.img', path_to_store + '/' + initrdfile)
        shutil.copy(image_path + '/boot/vmlinuz-' + kernver, path_to_store + '/' + kernfile)
        os.chown(path_to_store + '/' + initrdfile, user_id, grp_id)
        os.chmod(path_to_store + '/' + initrdfile, 0644)
        os.chown(path_to_store + '/' + kernfile, user_id, grp_id)
        os.chmod(path_to_store + '/' + kernfile, 0644)
        self.set('kernfile', kernfile)
        self.set('initrdfile', initrdfile)
        self._logger.warning("Boot files was copied, but luna module might not being added to initrd. Please check /etc/dracut.conf.d in image")
        return True
_twistd_unix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setupEnvironment(self, chroot, rundir, nodaemon, umask, pidfile):
        """
        Set the filesystem root, the working directory, and daemonize.

        @type chroot: C{str} or L{None}
        @param chroot: If not None, a path to use as the filesystem root (using
            L{os.chroot}).

        @type rundir: C{str}
        @param rundir: The path to set as the working directory.

        @type nodaemon: C{bool}
        @param nodaemon: A flag which, if set, indicates that daemonization
            should not be done.

        @type umask: C{int} or L{None}
        @param umask: The value to which to change the process umask.

        @type pidfile: C{str} or L{None}
        @param pidfile: If not L{None}, the path to a file into which to put
            the PID of this process.
        """
        daemon = not nodaemon

        if chroot is not None:
            os.chroot(chroot)
            if rundir == '.':
                rundir = '/'
        os.chdir(rundir)
        if daemon and umask is None:
            umask = 0o077
        if umask is not None:
            os.umask(umask)
        if daemon:
            from twisted.internet import reactor
            self.config["statusPipe"] = self.daemonize(reactor)
        if pidfile:
            with open(pidfile, 'wb') as f:
                f.write(intToBytes(os.getpid()))
_twistd_unix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def startApplication(self, application):
        """
        Configure global process state based on the given application and run
        the application.

        @param application: An object which can be adapted to
            L{service.IProcess} and L{service.IService}.
        """
        process = service.IProcess(application)
        if not self.config['originalname']:
            launchWithName(process.processName)
        self.setupEnvironment(
            self.config['chroot'], self.config['rundir'],
            self.config['nodaemon'], self.config['umask'],
            self.config['pidfile'])

        service.IService(application).privilegedStartService()

        uid, gid = self.config['uid'], self.config['gid']
        if uid is None:
            uid = process.uid
        if gid is None:
            gid = process.gid

        self.shedPrivileges(self.config['euid'], uid, gid)
        app.startApplication(application, not self.config['no_save'])
test_twistd.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def setUp(self):
        self.root = self.unset
        self.cwd = self.unset
        self.mask = self.unset
        self.daemon = False
        self.pid = os.getpid()
        self.patch(os, 'chroot', lambda path: setattr(self, 'root', path))
        self.patch(os, 'chdir', lambda path: setattr(self, 'cwd', path))
        self.patch(os, 'umask', lambda mask: setattr(self, 'mask', mask))
        self.runner = UnixApplicationRunner(twistd.ServerOptions())
        self.runner.daemonize = self.daemonize


问题


面经


文章

微信
公众号

扫码关注公众号