python类chroot()的实例源码

test_twistd.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_chroot(self):
        """
        L{UnixApplicationRunner.setupEnvironment} changes the root of the
        filesystem if passed a non-L{None} value for the C{chroot} parameter.
        """
        self.runner.setupEnvironment("/foo/bar", ".", True, None, None)
        self.assertEqual(self.root, "/foo/bar")
test_twistd.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_noChroot(self):
        """
        L{UnixApplicationRunner.setupEnvironment} does not change the root of
        the filesystem if passed L{None} for the C{chroot} parameter.
        """
        self.runner.setupEnvironment(None, ".", True, None, None)
        self.assertIs(self.root, self.unset)
path.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def chroot(self):
            os.chroot(self)
miniircd.py 文件源码 项目:luftwaffel 作者: exp-publishing 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, options):
        self.ports = options.ports
        self.password = options.password
        self.ssl_pem_file = options.ssl_pem_file
        self.motdfile = options.motd
        self.verbose = options.verbose
        self.debug = options.debug
        self.logdir = options.logdir
        self.chroot = options.chroot
        self.setuid = options.setuid
        self.statedir = options.statedir

        if self.ssl_pem_file:
            self.ssl = __import__("ssl")

        # Find certificate after daemonization if path is relative:
        if self.ssl_pem_file and os.path.exists(self.ssl_pem_file):
            self.ssl_pem_file = os.path.abspath(self.ssl_pem_file)
        # else: might exist in the chroot jail, so just continue

        if options.listen:
            self.address = socket.gethostbyname(options.listen)
        else:
            self.address = ""
        server_name_limit = 63  # From the RFC.
        self.name = socket.getfqdn(self.address)[:server_name_limit]

        self.channels = {}  # irc_lower(Channel name) --> Channel instance.
        self.clients = {}  # Socket --> Client instance.
        self.nicknames = {}  # irc_lower(Nickname) --> Client instance.
        if self.logdir:
            create_directory(self.logdir)
        if self.statedir:
            create_directory(self.statedir)
ksace.py 文件源码 项目:stacki-ace 作者: Teradata 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def doScript(section, script, scriptnum):
    fname = '/run/ks-script-%s-%d' % (section, scriptnum)

    f = open(fname, 'w')
    f.write('#!%s\n\n' % script.interp)

    s = '%s' % script
    for line in s.split('\n'):
        if line.startswith('%pre') or line.startswith('%post') \
                or line.startswith('%end'):
            #
            # skip
            #
            continue

        f.write('%s\n' % line)

    f.close()
    os.chmod(fname, 0700)

    pid = os.fork()

    if pid == 0:
        if section == 'post' and script.inChroot:
            shutil.copy(fname, '/run/mnt/sysimage%s' % fname)
            os.chroot('/run/mnt/sysimage')

        #
        # set stdout and stderr to files on the disk so we can examine
        # the output and errors
        #
        fout = open('%s.out' % fname, 'w')
        ferr = open('%s.err' % fname, 'w')

        subprocess.call([ fname ], stdout = fout, stderr = ferr)

        fout.close()
        ferr.close()
        sys.exit(0)
    else:
        os.wait()
Unix.py 文件源码 项目:lib9 作者: Jumpscale 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def chroot(self, path):
        '''Change root directory path

        @param path: Path to chroot() to
        @type path: string
        '''
        if not path or not j.data.types.unixdirpath.check(path):
            raise ValueError('Path %s is invalid' % path)

        j.logger.logging.info('Change root to %s' % path)
        os.chroot(path)
warden_filer.py 文件源码 项目:STaaS 作者: CESNET 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def daemonize(
        work_dir = None, chroot_dir = None,
        umask = None, uid = None, gid = None,
        pidfile = None, files_preserve = [], signals = {}):
    # Dirs, limits, users
    if chroot_dir is not None:
        os.chdir(chroot_dir)
        os.chroot(chroot_dir)
    if umask is not None:
        os.umask(umask)
    if work_dir is not None:
        os.chdir(work_dir)
    if gid is not None:
        os.setgid(gid)
    if uid is not None:
        os.setuid(uid)
    # Doublefork, split session
    if os.fork()>0:
        os._exit(0)
    os.setsid()
    if os.fork()>0:
        os._exit(0)
    # Setup signal handlers
    for (signum, handler) in signals.items():
        signal.signal(signum, handler)
    # Close descriptors
    descr_preserve = set(f.fileno() for f in files_preserve)
    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if maxfd==resource.RLIM_INFINITY:
        maxfd = 65535
    for fd in range(maxfd, 3, -1):  # 3 means omit stdin, stdout, stderr
        if fd not in descr_preserve:
            try:
                os.close(fd)
            except Exception:
                pass
    # Redirect stdin, stdout, stderr to /dev/null
    devnull = os.open(os.devnull, os.O_RDWR)
    for fd in range(3):
        os.dup2(devnull, fd)
    # PID file
    if pidfile is not None:
        pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC)
        os.write(pidd, str(os.getpid())+"\n")
        os.close(pidd)
        # Define and setup atexit closure
        @atexit.register
        def unlink_pid():
            try:
                os.unlink(pidfile)
            except Exception:
                pass
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def prepare_minimal_packages_list(graphs):
    """
    Prepares the minimal list of package names that are needed to be installed
    in the chroot so that rpmrebuild can be used inside it.

    @param graphs           The list of dependency graphs of repositories.
    @return                 The list of packages.
    """
    symbols = ["useradd", "mkdir", "awk", "cpio", "make", "rpmbuild", "sed"]
    deprecated_substrings = ["mic-bootstrap", "x86", "x64"]
    providers = {}
    for symbol in symbols:
        for graph in graphs:
            if providers.get(symbol) is None:
                providers[symbol] = Set()
            names = graph.get_provider_names(symbol)
            if names is not None:
                providers[symbol] = providers[symbol] | Set(names)
                logging.debug("Got providers {0} for symbol "
                              "{1}".format(names, symbol))
                logging.debug("{0}".format(providers[symbol]))

    packages = []
    for symbol in symbols:
        provider = None
        if len(providers[symbol]) < 1:
            for graph in graphs:
                for key in graph.symbol_providers.keys():
                    logging.debug("{0} : {1}".format(
                        key, graph.symbol_providers[key]))
            logging.error("Failed to find symbol {0}".format(symbol))
            logging.error("size: {0}".format(len(graph.symbol_providers)))
            sys.exit("Error.")
        elif len(providers[symbol]) > 1:
            logging.debug("Analyzing symbol {0}:".format(symbol))
            for provider in providers[symbol]:
                logging.debug(" Provided by {0}".format(provider))
                for substring in deprecated_substrings:
                    if substring in provider:
                        logging.debug("   ^--- will be ignored, contains "
                                      "{0}".format(substring))
                        providers[symbol] = providers[symbol] - Set([provider])
                        logging.debug("      {0}".format(providers[symbol]))
            if len(providers[symbol]) > 1:
                logging.warning("Multiple provider names for symbol "
                                "\"{0}\":".format(symbol))
                for provider in providers[symbol]:
                    logging.warning(" * {0}".format(provider))

            provider = next(iter(providers[symbol]))  # FIXME: is it correct?
        else:
            provider = next(iter(providers[symbol]))
        packages.append(provider)

    logging.debug("Minimal packages list:")
    for package in packages:
        logging.debug(" * {0}".format(package))

    return packages
rpm_patcher.py 文件源码 项目:combirepo 作者: Samsung 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __prepare(self):
        """
        Prepares the patching root ready for RPM patching.

        """
        global developer_disable_patching
        if developer_disable_patching:
            logging.debug("RPM patcher will not be prepared.")
            return
        graphs = self._graphs
        self.__prepare_image(graphs)
        self.patching_root = temporaries.mount_firmware(self.images_directory)
        host_arch = platform.machine()
        host_arches = self.__produce_architecture_synonyms_list(host_arch)
        if self.architecture not in host_arches:
            self.__deploy_qemu_package()

        combirepo_dir = os.path.abspath(os.path.dirname(__file__))
        rpmrebuild_file = os.path.join(combirepo_dir, 'data/rpmrebuild.tar')
        already_present_rpmrebuilds = files.find_fast(self.patching_root,
                                                      "rpmrebuild.*")
        for already_present_rpmrebuild in already_present_rpmrebuilds:
            if os.path.isdir(already_present_rpmrebuild):
                shutil.rmtree(already_present_rpmrebuild)
            elif os.path.isfile(already_present_rpmrebuild):
                os.remove(already_present_rpmrebuild)
        hidden_subprocess.call("Extracting the rpmrebuild ",
                               ["tar", "xf", rpmrebuild_file, "-C",
                                self.patching_root])

        queue = multiprocessing.Queue()
        child = multiprocessing.Process(target=self.__install_rpmrebuild,
                                        args=(queue,))
        child.start()
        child.join()
        if queue.empty():
            logging.error("Failed to install rpmrebuild into chroot.")
            sys.exit("Error.")
        else:
            result = queue.get()
            if result:
                logging.debug("Installation of rpmrebuild successfully "
                              "completed.")
            else:
                raise Exception("Impossible happened.")


问题


面经


文章

微信
公众号

扫码关注公众号