def _write_symlink(self, zf, link_target, link_path):
"""Package symlinks with appropriate zipfile metadata."""
info = zipfile.ZipInfo()
info.filename = link_path
info.create_system = 3
# Magic code for symlinks / py2/3 compat
# 27166663808 = (stat.S_IFLNK | 0755) << 16
info.external_attr = 2716663808
zf.writestr(info, link_target)
python类S_IFLNK的实例源码
def symlink(self, parent_inode, name, target, ctx):
assert self._initialized
assert_or_errno(self.access(parent_inode, WX_OK, ctx),
EPERM, 'no wx perm to parent inode')
n = self._lookup(parent_inode, name, ctx, noref=True)
assert_or_errno(not n, EEXIST, 'link target exists')
try:
self.unlink(parent_inode, name, ctx)
except llfuse.FUSEError as e:
if e.errno != ENOENT:
raise
fd, a = self.create(parent_inode, name, stat.S_IFLNK | 0o777,
os.O_WRONLY, ctx)
try:
self.write(fd, 0, target)
return a
finally:
self.release(fd)
def verify(self, img, **args):
"""Returns a tuple of lists of the form (errors, warnings,
info). The error list will be empty if the action has been
correctly installed in the given image."""
target = self.attrs["target"]
path = self.get_installed_path(img.get_root())
lstat, errors, warnings, info, abort = \
self.verify_fsobj_common(img, stat.S_IFLNK)
if abort:
assert errors
return errors, warnings, info
atarget = os.readlink(path)
if target != atarget:
errors.append(_("Target: '{found}' should be "
"'{expected}'").format(found=atarget,
expected=target))
return errors, warnings, info
def cleanup_mode(mode):
"""Cleanup a mode value.
This will return a mode that can be stored in a tree object.
:param mode: Mode to clean up.
"""
if stat.S_ISLNK(mode):
return stat.S_IFLNK
elif stat.S_ISDIR(mode):
return stat.S_IFDIR
elif S_ISGITLINK(mode):
return S_IFGITLINK
ret = stat.S_IFREG | 0o644
ret |= (mode & 0o111)
return ret
def _initialize_aci(self, mode, fileType):
valid_types = [
stat.S_IFREG,
stat.S_IFDIR,
stat.S_IFCHR,
stat.S_IFBLK,
stat.S_IFIFO,
stat.S_IFLNK,
stat.S_IFSOCK]
if fileType not in valid_types:
raise RuntimeError("Invalid file type.")
aci = self.aciCollection.new()
uid = os.getuid()
aci.dbobj.id = uid
aci.dbobj.uname = pwd.getpwuid(uid).pw_name
aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name
aci.dbobj.mode = int(mode, 8) + fileType
aci.save()
return aci.key
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None,
compress=None):
"""Adds a file to the given ZipFile with a hard-coded modified time.
Args:
zip_file: ZipFile instance to add the file to.
zip_path: Destination path within the zip file.
src_path: Path of the source file. Mutually exclusive with |data|.
data: File data as a string.
compress: Whether to enable compression. Default is taken from ZipFile
constructor.
"""
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
CheckZipPath(zip_path)
zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP)
zipinfo.external_attr = _HERMETIC_FILE_ATTR
if src_path and os.path.islink(src_path):
zipinfo.filename = zip_path
zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink
zip_file.writestr(zipinfo, os.readlink(src_path))
return
if src_path:
with file(src_path) as f:
data = f.read()
# zipfile will deflate even when it makes the file bigger. To avoid
# growing files, disable compression at an arbitrary cut off point.
if len(data) < 16:
compress = False
# None converts to ZIP_STORED, when passed explicitly rather than the
# default passed to the ZipFile constructor.
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type)
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None,
compress=None):
"""Adds a file to the given ZipFile with a hard-coded modified time.
Args:
zip_file: ZipFile instance to add the file to.
zip_path: Destination path within the zip file.
src_path: Path of the source file. Mutually exclusive with |data|.
data: File data as a string.
compress: Whether to enable compression. Default is take from ZipFile
constructor.
"""
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
CheckZipPath(zip_path)
zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP)
zipinfo.external_attr = _HERMETIC_FILE_ATTR
if src_path and os.path.islink(src_path):
zipinfo.filename = zip_path
zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink
zip_file.writestr(zipinfo, os.readlink(src_path))
return
if src_path:
with file(src_path) as f:
data = f.read()
# zipfile will deflate even when it makes the file bigger. To avoid
# growing files, disable compression at an arbitrary cut off point.
if len(data) < 16:
compress = False
# None converts to ZIP_STORED, when passed explicitly rather than the
# default passed to the ZipFile constructor.
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type)
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None,
compress=None):
"""Adds a file to the given ZipFile with a hard-coded modified time.
Args:
zip_file: ZipFile instance to add the file to.
zip_path: Destination path within the zip file.
src_path: Path of the source file. Mutually exclusive with |data|.
data: File data as a string.
compress: Whether to enable compression. Default is take from ZipFile
constructor.
"""
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
CheckZipPath(zip_path)
zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP)
zipinfo.external_attr = _HERMETIC_FILE_ATTR
if src_path and os.path.islink(src_path):
zipinfo.filename = zip_path
zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink
zip_file.writestr(zipinfo, os.readlink(src_path))
return
if src_path:
with file(src_path) as f:
data = f.read()
# zipfile will deflate even when it makes the file bigger. To avoid
# growing files, disable compression at an arbitrary cut off point.
if len(data) < 16:
compress = False
# None converts to ZIP_STORED, when passed explicitly rather than the
# default passed to the ZipFile constructor.
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type)
def dbpcs(mode):
if mode & stat.S_IFLNK == stat.S_IFLNK: return 's'
if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's'
if mode & stat.S_IFREG == stat.S_IFREG: return '-'
if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b'
if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd'
if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p'
if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c'
return '?'
def getattr(self, inode, ctx=None):
attrs = self.inodes.get(inode)
if attrs is None:
raise llfuse.FUSEError(errno.ENOENT) # FIXME
if attrs.get('type') == 'tree':
mode_filetype = stat.S_IFDIR
elif attrs.get('type') == 'blob':
mode_filetype = stat.S_IFREG
elif attrs.get('filetype') == 'link':
mode_filetype = stat.S_IFLNK
elif attrs.get('filetype') == 'fifo':
mode_filetype = stat.S_IFIFO
else:
raise llfuse.FUSEError(errno.ENOENT) # FIXME
entry = llfuse.EntryAttributes()
entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE)
if attrs.get('type') == 'blob' and 'ref' in attrs:
entry.st_size = self.storage.size(attrs['ref'])
else:
entry.st_size = 0
stamp = int(1438467123.985654 * 1e9)
entry.st_atime_ns = stamp
entry.st_ctime_ns = stamp
entry.st_mtime_ns = stamp
entry.st_gid = 0
entry.st_uid = 0
entry.st_ino = inode
return entry
def getattr(self, path, fh=None):
(typ,kw) = path_type(path)
now = time.time()
if typ=='root' or typ=='histories':
# Simple directory
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
elif typ=='datasets':
# Simple directory
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
elif typ=='historydataorcoll':
# Dataset or collection
d = self._dataset(kw)
if d['history_content_type'] == 'dataset_collection':
# A collection, will be a simple directory.
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
else:
# A file, will be a symlink to a galaxy dataset.
t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f'))
fname = esc_filename(d.get('file_path', d['file_name']))
st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1,
st_size=len(fname), st_ctime=t, st_mtime=t,
st_atime=t)
elif typ=='collectiondataset':
# A file within a collection, will be a symlink to a galaxy dataset.
d = self._dataset(kw, display=False)
t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f'))
fname = esc_filename(d.get('file_path', d['file_name']))
st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1,
st_size=len(fname), st_ctime=t, st_mtime=t,
st_atime=t)
else:
raise FuseOSError(ENOENT)
return st
# Return a symlink for the given dataset
def test_islink_returns_true_for_symlink(self):
metadata = obnamlib.Metadata(st_mode=stat.S_IFLNK)
self.assert_(metadata.islink())
def test_returns_symlink_fields_correctly(self):
self.fakefs.st_mode |= stat.S_IFLNK
metadata = obnamlib.read_metadata(self.fakefs, 'foo',
getpwuid=self.fakefs.getpwuid,
getgrgid=self.fakefs.getgrgid)
fields = ['st_mode', 'target']
for field in fields:
self.assertEqual(getattr(metadata, field),
getattr(self.fakefs, field),
field)
def test_sets_symlink_target(self):
self.fs.remove(self.filename)
self.metadata.st_mode = 0777 | stat.S_IFLNK
self.metadata.target = 'target'
obnamlib.set_metadata(self.fs, self.filename, self.metadata)
self.assertEqual(self.fs.readlink(self.filename), 'target')
def test_sets_symlink_mtime_perms(self):
self.fs.remove(self.filename)
self.metadata.st_mode = 0777 | stat.S_IFLNK
self.metadata.target = 'target'
obnamlib.set_metadata(self.fs, self.filename, self.metadata)
st = os.lstat(self.filename)
self.assertEqual(st.st_mode, self.metadata.st_mode)
self.assertEqual(st.st_mtime, self.metadata.st_mtime_sec)
def check(self):
"""Check this object for internal consistency.
:raise ObjectFormatException: if the object is malformed in some way
"""
super(Tree, self).check()
last = None
allowed_modes = (stat.S_IFREG | 0o755, stat.S_IFREG | 0o644,
stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK,
# TODO: optionally exclude as in git fsck --strict
stat.S_IFREG | 0o664)
for name, mode, sha in parse_tree(b''.join(self._chunked_text),
True):
check_hexsha(sha, 'invalid sha %s' % sha)
if b'/' in name or name in (b'', b'.', b'..'):
raise ObjectFormatException('invalid name %s' % name)
if mode not in allowed_modes:
raise ObjectFormatException('invalid mode %06o' % mode)
entry = (name, (mode, sha))
if last:
if key_entry(last) > key_entry(entry):
raise ObjectFormatException('entries not sorted')
if name == last[0]:
raise ObjectFormatException('duplicate entry %s' % name)
last = entry
def symlink(self, target, source):
self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1,
st_size=len(source))
self.data[target] = source
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None,
compress=None):
"""Adds a file to the given ZipFile with a hard-coded modified time.
Args:
zip_file: ZipFile instance to add the file to.
zip_path: Destination path within the zip file.
src_path: Path of the source file. Mutually exclusive with |data|.
data: File data as a string.
compress: Whether to enable compression. Default is taken from ZipFile
constructor.
"""
assert (src_path is None) != (data is None), (
'|src_path| and |data| are mutually exclusive.')
CheckZipPath(zip_path)
zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP)
zipinfo.external_attr = _HERMETIC_FILE_ATTR
if src_path and os.path.islink(src_path):
zipinfo.filename = zip_path
zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink
zip_file.writestr(zipinfo, os.readlink(src_path))
return
if src_path:
with file(src_path) as f:
data = f.read()
# zipfile will deflate even when it makes the file bigger. To avoid
# growing files, disable compression at an arbitrary cut off point.
if len(data) < 16:
compress = False
# None converts to ZIP_STORED, when passed explicitly rather than the
# default passed to the ZipFile constructor.
compress_type = zip_file.compression
if compress is not None:
compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_file.writestr(zipinfo, data, compress_type)
def __str__(self):
"create a unix-style long description of the file (like ls -l)"
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & 0700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & 070) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == 0xffffffffL):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
if uid is None:
uid = 0
if gid is None:
gid = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
if uid is None:
uid = 0
if gid is None:
gid = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, self.st_size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx(
(self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx(
(self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(
self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime(
'%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime(
'%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (
ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx(
(self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx(
(self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(
self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime(
'%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime(
'%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (
ks, uid, gid, size, datestr, filename)
def init_root(self):
# we need the list of all real (non-checkpoint) generations
client_name = self.obnam.app.settings['client-name']
generations = [
gen
for gen in self.obnam.repo.get_client_generation_ids(client_name)
if not self.obnam.repo.get_generation_key(
gen, obnamlib.REPO_GENERATION_IS_CHECKPOINT)]
# self.rootlist holds the stat information for each entry at
# the root of the FUSE filesystem: /.pid, /latest, and one for
# each generation.
self.rootlist = {}
used_generations = []
for gen in generations:
genspec = self.obnam.repo.make_generation_spec(gen)
path = '/' + genspec
try:
genstat = self.get_stat_in_generation(path)
end = self.obnam.repo.get_generation_key(
gen, obnamlib.REPO_GENERATION_ENDED)
genstat.st_ctime = genstat.st_mtime = end
self.rootlist[path] = genstat
used_generations.append(gen)
except obnamlib.ObnamError as e:
logging.warning('Ignoring error %s', str(e))
assert used_generations
# self.rootstat is the stat information for the root of the
# FUSE filesystem. We set it to the same as that of the latest
# generation.
latest_gen_id = used_generations[-1]
latest_gen_spec = self.obnam.repo.make_generation_spec(latest_gen_id)
latest_gen_root_stat = self.rootlist['/' + latest_gen_spec]
self.rootstat = fuse.Stat(**latest_gen_root_stat.__dict__)
# Add an entry for /latest to self.rootlist.
symlink_stat = fuse.Stat(
target=latest_gen_spec,
**latest_gen_root_stat.__dict__)
symlink_stat.st_mode &= ~(stat.S_IFDIR | stat.S_IFREG)
symlink_stat.st_mode |= stat.S_IFLNK
self.rootlist['/latest'] = symlink_stat
# Add an entry for /.pid to self.rootlist.
pidstat = fuse.Stat(**self.rootstat.__dict__)
pidstat.st_mode = (
stat.S_IFREG | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
self.rootlist['/.pid'] = pidstat
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def __str__(self):
"""create a unix-style long description of the file (like ls -l)"""
if self.st_mode is not None:
kind = stat.S_IFMT(self.st_mode)
if kind == stat.S_IFIFO:
ks = 'p'
elif kind == stat.S_IFCHR:
ks = 'c'
elif kind == stat.S_IFDIR:
ks = 'd'
elif kind == stat.S_IFBLK:
ks = 'b'
elif kind == stat.S_IFREG:
ks = '-'
elif kind == stat.S_IFLNK:
ks = 'l'
elif kind == stat.S_IFSOCK:
ks = 's'
else:
ks = '?'
ks += self._rwx((self.st_mode & o700) >> 6, self.st_mode & stat.S_ISUID)
ks += self._rwx((self.st_mode & o70) >> 3, self.st_mode & stat.S_ISGID)
ks += self._rwx(self.st_mode & 7, self.st_mode & stat.S_ISVTX, True)
else:
ks = '?---------'
# compute display date
if (self.st_mtime is None) or (self.st_mtime == xffffffff):
# shouldn't really happen
datestr = '(unknown date)'
else:
if abs(time.time() - self.st_mtime) > 15552000:
# (15552000 = 6 months)
datestr = time.strftime('%d %b %Y', time.localtime(self.st_mtime))
else:
datestr = time.strftime('%d %b %H:%M', time.localtime(self.st_mtime))
filename = getattr(self, 'filename', '?')
# not all servers support uid/gid
uid = self.st_uid
gid = self.st_gid
size = self.st_size
if uid is None:
uid = 0
if gid is None:
gid = 0
if size is None:
size = 0
return '%s 1 %-8d %-8d %8d %-12s %s' % (ks, uid, gid, size, datestr, filename)
def chmod(self, ppath, mode):
"""
Change mode for files or directories
:param ppath: path of file/dir
:param mode: string of the mode
Examples:
flistmeta.chmod("/tmp/dir1", "777")
"""
fType, dirObj = self._search_db(ppath)
if dirObj.dbobj.state != "":
raise RuntimeError("%s: No such file or directory" % ppath)
try:
mode = int(mode, 8)
except ValueError:
raise ValueError("Invalid mode.")
else:
if fType == "D":
_mode = mode + stat.S_IFDIR
aclObj = self.aciCollection.get(dirObj.dbobj.aclkey)
aclObj.dbobj.mode = _mode
aclObj.save()
elif fType == "F" or fType == "L":
_mode = mode + stat.S_IFREG if fType == "F" else mode + stat.S_IFLNK
_, propList = self._getPropertyList(dirObj.dbobj, fType)
for file in propList:
if file.name == j.sal.fs.getBaseName(ppath):
aclObj = self.aciCollection.get(file.aclkey)
aclObj.dbobj.mode = _mode
aclObj.save()
else:
for file in dirObj.dbobj.links:
if file.name == j.sal.fs.getBaseName(ppath):
aclObj = self.aciCollection.get(file.aclkey)
if stat.S_ISSOCK(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFSOCK
elif stat.S_ISBLK(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFBLK
elif stat.S_ISCHR(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFCHR
elif stat.S_ISFIFO(aclObj.dbobj.st_mode):
_mode = mode + stat.S_IFIFO
aclObj.dbobj.mode = _mode
aclObj.save()