def test_chroot(self):
"""
L{UnixApplicationRunner.setupEnvironment} changes the root of the
filesystem if passed a non-L{None} value for the C{chroot} parameter.
"""
self.runner.setupEnvironment("/foo/bar", ".", True, None, None)
self.assertEqual(self.root, "/foo/bar")
python类chroot()的实例源码
def test_noChroot(self):
"""
L{UnixApplicationRunner.setupEnvironment} does not change the root of
the filesystem if passed L{None} for the C{chroot} parameter.
"""
self.runner.setupEnvironment(None, ".", True, None, None)
self.assertIs(self.root, self.unset)
def chroot(self):
os.chroot(self)
def __init__(self, options):
self.ports = options.ports
self.password = options.password
self.ssl_pem_file = options.ssl_pem_file
self.motdfile = options.motd
self.verbose = options.verbose
self.debug = options.debug
self.logdir = options.logdir
self.chroot = options.chroot
self.setuid = options.setuid
self.statedir = options.statedir
if self.ssl_pem_file:
self.ssl = __import__("ssl")
# Find certificate after daemonization if path is relative:
if self.ssl_pem_file and os.path.exists(self.ssl_pem_file):
self.ssl_pem_file = os.path.abspath(self.ssl_pem_file)
# else: might exist in the chroot jail, so just continue
if options.listen:
self.address = socket.gethostbyname(options.listen)
else:
self.address = ""
server_name_limit = 63 # From the RFC.
self.name = socket.getfqdn(self.address)[:server_name_limit]
self.channels = {} # irc_lower(Channel name) --> Channel instance.
self.clients = {} # Socket --> Client instance.
self.nicknames = {} # irc_lower(Nickname) --> Client instance.
if self.logdir:
create_directory(self.logdir)
if self.statedir:
create_directory(self.statedir)
def doScript(section, script, scriptnum):
fname = '/run/ks-script-%s-%d' % (section, scriptnum)
f = open(fname, 'w')
f.write('#!%s\n\n' % script.interp)
s = '%s' % script
for line in s.split('\n'):
if line.startswith('%pre') or line.startswith('%post') \
or line.startswith('%end'):
#
# skip
#
continue
f.write('%s\n' % line)
f.close()
os.chmod(fname, 0700)
pid = os.fork()
if pid == 0:
if section == 'post' and script.inChroot:
shutil.copy(fname, '/run/mnt/sysimage%s' % fname)
os.chroot('/run/mnt/sysimage')
#
# set stdout and stderr to files on the disk so we can examine
# the output and errors
#
fout = open('%s.out' % fname, 'w')
ferr = open('%s.err' % fname, 'w')
subprocess.call([ fname ], stdout = fout, stderr = ferr)
fout.close()
ferr.close()
sys.exit(0)
else:
os.wait()
def chroot(self, path):
'''Change root directory path
@param path: Path to chroot() to
@type path: string
'''
if not path or not j.data.types.unixdirpath.check(path):
raise ValueError('Path %s is invalid' % path)
j.logger.logging.info('Change root to %s' % path)
os.chroot(path)
def daemonize(
work_dir = None, chroot_dir = None,
umask = None, uid = None, gid = None,
pidfile = None, files_preserve = [], signals = {}):
# Dirs, limits, users
if chroot_dir is not None:
os.chdir(chroot_dir)
os.chroot(chroot_dir)
if umask is not None:
os.umask(umask)
if work_dir is not None:
os.chdir(work_dir)
if gid is not None:
os.setgid(gid)
if uid is not None:
os.setuid(uid)
# Doublefork, split session
if os.fork()>0:
os._exit(0)
os.setsid()
if os.fork()>0:
os._exit(0)
# Setup signal handlers
for (signum, handler) in signals.items():
signal.signal(signum, handler)
# Close descriptors
descr_preserve = set(f.fileno() for f in files_preserve)
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd==resource.RLIM_INFINITY:
maxfd = 65535
for fd in range(maxfd, 3, -1): # 3 means omit stdin, stdout, stderr
if fd not in descr_preserve:
try:
os.close(fd)
except Exception:
pass
# Redirect stdin, stdout, stderr to /dev/null
devnull = os.open(os.devnull, os.O_RDWR)
for fd in range(3):
os.dup2(devnull, fd)
# PID file
if pidfile is not None:
pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC)
os.write(pidd, str(os.getpid())+"\n")
os.close(pidd)
# Define and setup atexit closure
@atexit.register
def unlink_pid():
try:
os.unlink(pidfile)
except Exception:
pass
def prepare_minimal_packages_list(graphs):
"""
Prepares the minimal list of package names that are needed to be installed
in the chroot so that rpmrebuild can be used inside it.
@param graphs The list of dependency graphs of repositories.
@return The list of packages.
"""
symbols = ["useradd", "mkdir", "awk", "cpio", "make", "rpmbuild", "sed"]
deprecated_substrings = ["mic-bootstrap", "x86", "x64"]
providers = {}
for symbol in symbols:
for graph in graphs:
if providers.get(symbol) is None:
providers[symbol] = Set()
names = graph.get_provider_names(symbol)
if names is not None:
providers[symbol] = providers[symbol] | Set(names)
logging.debug("Got providers {0} for symbol "
"{1}".format(names, symbol))
logging.debug("{0}".format(providers[symbol]))
packages = []
for symbol in symbols:
provider = None
if len(providers[symbol]) < 1:
for graph in graphs:
for key in graph.symbol_providers.keys():
logging.debug("{0} : {1}".format(
key, graph.symbol_providers[key]))
logging.error("Failed to find symbol {0}".format(symbol))
logging.error("size: {0}".format(len(graph.symbol_providers)))
sys.exit("Error.")
elif len(providers[symbol]) > 1:
logging.debug("Analyzing symbol {0}:".format(symbol))
for provider in providers[symbol]:
logging.debug(" Provided by {0}".format(provider))
for substring in deprecated_substrings:
if substring in provider:
logging.debug(" ^--- will be ignored, contains "
"{0}".format(substring))
providers[symbol] = providers[symbol] - Set([provider])
logging.debug(" {0}".format(providers[symbol]))
if len(providers[symbol]) > 1:
logging.warning("Multiple provider names for symbol "
"\"{0}\":".format(symbol))
for provider in providers[symbol]:
logging.warning(" * {0}".format(provider))
provider = next(iter(providers[symbol])) # FIXME: is it correct?
else:
provider = next(iter(providers[symbol]))
packages.append(provider)
logging.debug("Minimal packages list:")
for package in packages:
logging.debug(" * {0}".format(package))
return packages
def __prepare(self):
"""
Prepares the patching root ready for RPM patching.
"""
global developer_disable_patching
if developer_disable_patching:
logging.debug("RPM patcher will not be prepared.")
return
graphs = self._graphs
self.__prepare_image(graphs)
self.patching_root = temporaries.mount_firmware(self.images_directory)
host_arch = platform.machine()
host_arches = self.__produce_architecture_synonyms_list(host_arch)
if self.architecture not in host_arches:
self.__deploy_qemu_package()
combirepo_dir = os.path.abspath(os.path.dirname(__file__))
rpmrebuild_file = os.path.join(combirepo_dir, 'data/rpmrebuild.tar')
already_present_rpmrebuilds = files.find_fast(self.patching_root,
"rpmrebuild.*")
for already_present_rpmrebuild in already_present_rpmrebuilds:
if os.path.isdir(already_present_rpmrebuild):
shutil.rmtree(already_present_rpmrebuild)
elif os.path.isfile(already_present_rpmrebuild):
os.remove(already_present_rpmrebuild)
hidden_subprocess.call("Extracting the rpmrebuild ",
["tar", "xf", rpmrebuild_file, "-C",
self.patching_root])
queue = multiprocessing.Queue()
child = multiprocessing.Process(target=self.__install_rpmrebuild,
args=(queue,))
child.start()
child.join()
if queue.empty():
logging.error("Failed to install rpmrebuild into chroot.")
sys.exit("Error.")
else:
result = queue.get()
if result:
logging.debug("Installation of rpmrebuild successfully "
"completed.")
else:
raise Exception("Impossible happened.")