def getattr(self, path, fh=None):
(typ,kw) = path_type(path)
now = time.time()
if typ=='root' or typ=='histories':
# Simple directory
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
elif typ=='datasets':
# Simple directory
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
elif typ=='historydataorcoll':
# Dataset or collection
d = self._dataset(kw)
if d['history_content_type'] == 'dataset_collection':
# A collection, will be a simple directory.
st = dict(st_mode=(S_IFDIR | 0555), st_nlink=2)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = now
else:
# A file, will be a symlink to a galaxy dataset.
t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f'))
fname = esc_filename(d.get('file_path', d['file_name']))
st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1,
st_size=len(fname), st_ctime=t, st_mtime=t,
st_atime=t)
elif typ=='collectiondataset':
# A file within a collection, will be a symlink to a galaxy dataset.
d = self._dataset(kw, display=False)
t = time.mktime(time.strptime(d['update_time'],'%Y-%m-%dT%H:%M:%S.%f'))
fname = esc_filename(d.get('file_path', d['file_name']))
st = dict(st_mode=(S_IFLNK | 0444), st_nlink=1,
st_size=len(fname), st_ctime=t, st_mtime=t,
st_atime=t)
else:
raise FuseOSError(ENOENT)
return st
# Return a symlink for the given dataset
python类S_IFDIR的实例源码
def is_dir(self, ino):
return And(self.is_valid(ino),
self._attrs.mode(ino) & stat.S_IFDIR != 0)
def is_regular(self, ino):
return And(self.is_valid(ino),
self._attrs.mode(ino) & stat.S_IFDIR == 0)
##
def is_dir(self, ino):
attr = self._inode.get_iattr(ino)
return And(self.is_valid(ino),
attr.mode & S_IFDIR != 0)
def is_regular(self, ino):
attr = self._inode.get_iattr(ino)
return And(self.is_valid(ino),
attr.mode & S_IFDIR == 0)
###
def forget(self, ino):
if Or(self.get_iattr(ino).mode & S_IFDIR != 0, self.get_iattr(ino).nlink != 1):
return
assertion(self.is_regular(ino), "forget: ino is not regular")
self._inode.begin_tx()
attr = self._inode.get_iattr(ino)
attr.nlink = 0
self._inode.set_iattr(ino, attr)
self._inode.commit_tx()
def getattr(self, submission=None, path=None):
if self.stat is None:
super(Directory, self).getattr(submission, path)
mode = 0o770 if self.writable else 0o550
self.stat['st_mode'] = S_IFDIR | mode
self.stat['st_nlink'] = 2
self.stat['st_atime'] = time()
return self.stat
def __init__(self, *args, **kwargs):
super(TempDirectory, self).__init__(*args, **kwargs)
self.stat = {
'st_size': 0,
'st_atime': time(),
'st_ctime': time(),
'st_size': 0,
'st_mtime': time(),
'st_mode': S_IFDIR | 0o770,
'st_nlink': 2,
}
def _to_mode(attr):
m = 0
if (attr & FILE_ATTRIBUTE_DIRECTORY):
m |= stdstat.S_IFDIR | 0o111
else:
m |= stdstat.S_IFREG
if (attr & FILE_ATTRIBUTE_READONLY):
m |= 0o444
else:
m |= 0o666
return m
def __init__(self):
self.st_mode = stat.S_IFDIR | 0755
self.st_ino = 0 # handled by FUSE
self.st_dev = 0 # handled by FUSE
self.st_nlink = 2 # a directory has two links: itself and ".."
self.st_uid = 0
self.st_gid = 0
self.st_size = 4096
self.st_atime = 0
self.st_mtime = 0
self.st_ctime = 0
# Producer thread that reads data from a stream, writes it to a file (if given),
# and notifies other threads via a condition variable.
def getattr(self, path):
st = MyStat()
if path == '/':
st.st_mode = stat.S_IFDIR | 0o755
st.st_nlink = 2
return st
if path == "/.command":
st.st_mode = stat.S_IFDIR | 0o755
return st
if path.startswith("/.command/"):
name = path.split("/", 3)[2]
if name in ("writeInto",):
st.st_mode = stat.S_IFREG | 0o444
return st
return -errno.ENOENT
# Get field
field = self.getField(path)
if not field:
return -errno.ENOENT
# Set size and mode
if field.is_field_set:
st.st_mode = stat.S_IFDIR | 0o755
else:
st.st_mode = stat.S_IFREG | 0o444
st.st_nlink = 1
if field.hasValue():
st.st_size = len(self.fieldValue(field))
else:
st.st_size = 0
return st
def readdir(self, path, offset):
log.info("readdir(%s)" % path)
yield fuse.Direntry('.')
yield fuse.Direntry('..')
if path == "/.command":
for entry in self.readCommandDir():
yield entry
return
# Get field
fieldset = self.getField(path)
# if not fieldset:
# return -errno.ENOENT
if path == "/":
entry = fuse.Direntry(".command")
entry.type = stat.S_IFREG
yield entry
# Format file name
count = len(fieldset)
if count % 10:
count += 10 - (count % 10)
format = "%%0%ud-%%s" % (count // 10)
# Create entries
for index, field in enumerate(fieldset):
name = format % (1 + index, field.name)
entry = fuse.Direntry(name)
if field.is_field_set:
entry.type = stat.S_IFDIR
else:
entry.type = stat.S_IFREG
yield entry
log.info("readdir(%s) done" % path)
def test_creates_GADirectory_from_dict(self):
orig = obnamlib.GADirectory()
orig.add_file('.')
orig.set_file_key(
'.', obnamlib.REPO_FILE_MODE, stat.S_IFDIR | 0755)
orig.add_file('README')
orig.set_file_key(
'README', obnamlib.REPO_FILE_MODE, stat.S_IFREG | 0644)
orig.add_subdir('.git', 'git-dir-id')
new = obnamlib.create_gadirectory_from_dict(orig.as_dict())
self.assertEqual(new.as_dict(), orig.as_dict())
def test_isdir_returns_true_for_directory(self):
metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR)
self.assert_(metadata.isdir())
def test_gets_file_child(self):
gen_id = self.create_generation()
self.repo.add_file(gen_id, '/foo')
self.repo.set_file_key(
gen_id, '/foo', obnamlib.REPO_FILE_MODE, stat.S_IFDIR | 0700)
self.repo.add_file(gen_id, '/foo/bar')
self.repo.set_file_key(
gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG | 0700)
self.assertEqual(
self.repo.get_file_children(gen_id, '/foo'),
['/foo/bar'])
def verify(self, img, **args):
"""Returns a tuple of lists of the form (errors, warnings,
info). The error list will be empty if the action has been
correctly installed in the given image."""
lstat, errors, warnings, info, abort = \
self.verify_fsobj_common(img, stat.S_IFDIR)
return errors, warnings, info
def check(self):
"""Check this object for internal consistency.
:raise ObjectFormatException: if the object is malformed in some way
"""
super(Tree, self).check()
last = None
allowed_modes = (stat.S_IFREG | 0o755, stat.S_IFREG | 0o644,
stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK,
# TODO: optionally exclude as in git fsck --strict
stat.S_IFREG | 0o664)
for name, mode, sha in parse_tree(b''.join(self._chunked_text),
True):
check_hexsha(sha, 'invalid sha %s' % sha)
if b'/' in name or name in (b'', b'.', b'..'):
raise ObjectFormatException('invalid name %s' % name)
if mode not in allowed_modes:
raise ObjectFormatException('invalid mode %06o' % mode)
entry = (name, (mode, sha))
if last:
if key_entry(last) > key_entry(entry):
raise ObjectFormatException('entries not sorted')
if name == last[0]:
raise ObjectFormatException('duplicate entry %s' % name)
last = entry
def as_pretty_string(self):
text = []
for name, mode, hexsha in self.iteritems():
if mode & stat.S_IFDIR:
kind = "tree"
else:
kind = "blob"
text.append("%04o %s %s\t%s\n" % (mode, kind, hexsha, name))
return "".join(text)
def commit_tree(object_store, blobs):
"""Commit a new tree.
:param object_store: Object store to add trees to
:param blobs: Iterable over blob path, sha, mode entries
:return: SHA1 of the created tree.
"""
trees = {b'': {}}
def add_tree(path):
if path in trees:
return trees[path]
dirname, basename = pathsplit(path)
t = add_tree(dirname)
assert isinstance(basename, bytes)
newtree = {}
t[basename] = newtree
trees[path] = newtree
return newtree
for path, sha, mode in blobs:
tree_path, basename = pathsplit(path)
tree = add_tree(tree_path)
tree[basename] = (mode, sha)
def build_tree(path):
tree = Tree()
for basename, entry in trees[path].items():
if isinstance(entry, dict):
mode = stat.S_IFDIR
sha = build_tree(pathjoin(path, basename))
else:
(mode, sha) = entry
tree.add(basename, mode, sha)
object_store.add_object(tree)
return tree.id
return build_tree(b'')
def walk_trees(store, tree1_id, tree2_id, prune_identical=False):
"""Recursively walk all the entries of two trees.
Iteration is depth-first pre-order, as in e.g. os.walk.
:param store: An ObjectStore for looking up objects.
:param tree1_id: The SHA of the first Tree object to iterate, or None.
:param tree2_id: The SHA of the second Tree object to iterate, or None.
:param prune_identical: If True, identical subtrees will not be walked.
:return: Iterator over Pairs of TreeEntry objects for each pair of entries
in the trees and their subtrees recursively. If an entry exists in one
tree but not the other, the other entry will have all attributes set
to None. If neither entry's path is None, they are guaranteed to
match.
"""
# This could be fairly easily generalized to >2 trees if we find a use
# case.
mode1 = tree1_id and stat.S_IFDIR or None
mode2 = tree2_id and stat.S_IFDIR or None
todo = [(TreeEntry(b'', mode1, tree1_id), TreeEntry(b'', mode2, tree2_id))]
while todo:
entry1, entry2 = todo.pop()
is_tree1 = _is_tree(entry1)
is_tree2 = _is_tree(entry2)
if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
continue
tree1 = is_tree1 and store[entry1.sha] or None
tree2 = is_tree2 and store[entry2.sha] or None
path = entry1.path or entry2.path
todo.extend(reversed(_merge_entries(path, tree1, tree2)))
yield entry1, entry2