def init_root(self):
# we need the list of all real (non-checkpoint) generations
client_name = self.obnam.app.settings['client-name']
generations = [
gen
for gen in self.obnam.repo.get_client_generation_ids(client_name)
if not self.obnam.repo.get_generation_key(
gen, obnamlib.REPO_GENERATION_IS_CHECKPOINT)]
# self.rootlist holds the stat information for each entry at
# the root of the FUSE filesystem: /.pid, /latest, and one for
# each generation.
self.rootlist = {}
used_generations = []
for gen in generations:
genspec = self.obnam.repo.make_generation_spec(gen)
path = '/' + genspec
try:
genstat = self.get_stat_in_generation(path)
end = self.obnam.repo.get_generation_key(
gen, obnamlib.REPO_GENERATION_ENDED)
genstat.st_ctime = genstat.st_mtime = end
self.rootlist[path] = genstat
used_generations.append(gen)
except obnamlib.ObnamError as e:
logging.warning('Ignoring error %s', str(e))
assert used_generations
# self.rootstat is the stat information for the root of the
# FUSE filesystem. We set it to the same as that of the latest
# generation.
latest_gen_id = used_generations[-1]
latest_gen_spec = self.obnam.repo.make_generation_spec(latest_gen_id)
latest_gen_root_stat = self.rootlist['/' + latest_gen_spec]
self.rootstat = fuse.Stat(**latest_gen_root_stat.__dict__)
# Add an entry for /latest to self.rootlist.
symlink_stat = fuse.Stat(
target=latest_gen_spec,
**latest_gen_root_stat.__dict__)
symlink_stat.st_mode &= ~(stat.S_IFDIR | stat.S_IFREG)
symlink_stat.st_mode |= stat.S_IFLNK
self.rootlist['/latest'] = symlink_stat
# Add an entry for /.pid to self.rootlist.
pidstat = fuse.Stat(**self.rootstat.__dict__)
pidstat.st_mode = (
stat.S_IFREG | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
self.rootlist['/.pid'] = pidstat
评论列表
文章目录