def getattr(self, path, fh=None):
try:
node = self.__path_to_object(path)
if node:
is_file = (type(node) == File)
st_mode_ft_bits = stat.S_IFREG if is_file else stat.S_IFDIR
st_mode_permissions = 0o444 if is_file else 0o555
return {
'st_mode': st_mode_ft_bits | st_mode_permissions,
'st_uid': self.uid,
'st_gid': self.gid,
'st_size': node.size if is_file else 0,
'st_ctime': node.time if is_file else 0,
'st_mtime': node.time if is_file else 0
}
else:
raise FuseOSError(ENOENT)
except IliasFSError as e:
raise_ilias_error_to_fuse(e)
python类S_IFDIR的实例源码
def get_file_type(path):
"""Retrieve the file type of the path
:param path: The path to get the file type for
:return: The file type as a string or None on error
"""
f_types = {
'socket': stat.S_IFSOCK,
'regular': stat.S_IFREG,
'block': stat.S_IFBLK,
'directory': stat.S_IFDIR,
'character_device': stat.S_IFCHR,
'fifo': stat.S_IFIFO,
}
if not path or not os.path.exists(path):
return None
obj = os.stat(path).st_mode
for key,val in f_types.items():
if obj & val == val:
return key
def __init__(self, filename=None, **kwargs):
filename = common.FileSpec(filename, filesystem="Reg", path_sep="\\")
super(RegistryKeyInformation, self).__init__(
filename=filename, **kwargs)
self.hive = self.key_name = self.value_name = self.value = ""
self.value_type = "REG_NONE"
self.st_mode = stat.S_IFDIR
path_components = self.filename.components()
if not path_components:
return
# The first component MUST be a hive
self.hive = path_components[0]
self._hive_handle = KeyHandle(getattr(_winreg, self.hive, None))
if self._hive_handle is None:
raise IOError("Unknown hive name %s" % self.hive)
# Maybe its a key.
try:
self._read_key(path_components)
except exceptions.WindowsError:
# Nop - maybe its a value then.
self._read_value(path_components)
def getattr(self, path):
# The path represents a pid.
components = os.path.split(path)
if len(components) > 2:
return
if len(components) == 2 and components[1] in self.tasks:
s = make_stat(components[1])
s.st_mode = stat.S_IFREG
s.st_size = self.address_space_size
return s
elif components[0] == "/":
s = make_stat(2)
s.st_mode = stat.S_IFDIR
return s
def backup_parents(self, root):
'''Back up parents of root, non-recursively.'''
root = self.fs.abspath(root)
tracing.trace('backing up parents of %s', root)
dummy_metadata = obnamlib.Metadata(st_mode=0777 | stat.S_IFDIR)
while True:
parent = os.path.dirname(root)
try:
metadata = obnamlib.read_metadata(self.fs, root)
except OSError, e:
logging.warning(
'Failed to get metadata for %s: %s: %s',
root, e.errno or 0, e.strerror)
logging.warning('Using fake metadata instead for %s', root)
metadata = dummy_metadata
if not self.pretend:
self.add_file_to_generation(root, metadata)
if root == parent:
break
root = parent
def setUp(self):
self.now = None
self.tempdir = tempfile.mkdtemp()
fs = obnamlib.LocalFS(self.tempdir)
self.hooks = obnamlib.HookManager()
self.hooks.new('repository-toplevel-init')
self.client = obnamlib.ClientMetadataTree(
fs, 'clientid', obnamlib.DEFAULT_NODE_SIZE,
obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE,
self)
# Force use of filename hash collisions.
self.client.default_file_id = self.client._bad_default_file_id
self.client.start_generation()
self.clientid = self.client.get_generation_id(self.client.tree)
self.file_metadata = obnamlib.Metadata(st_mode=stat.S_IFREG | 0666)
self.file_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata(
self.file_metadata)
self.dir_metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR | 0777)
self.dir_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata(
self.dir_metadata)
def make_file_struct(self, size, isfile=True, ctime=time(), mtime=time(), atime=time(), read_only=False):
stats = dict()
# TODO replace uncommented modes with commented when write ability is added
if isfile:
stats['st_mode'] = S_IFREG | 0o0444
else:
stats['st_mode'] = S_IFDIR | 0o0555
if not self.pakfile.read_only:
stats['st_mode'] |= 0o0200
stats['st_uid'] = os.getuid()
stats['st_gid'] = os.getgid()
stats['st_nlink'] = 1
stats['st_ctime'] = ctime
stats['st_mtime'] = mtime
stats['st_atime'] = atime
stats['st_size'] = size
return stats
def cleanup_mode(mode):
"""Cleanup a mode value.
This will return a mode that can be stored in a tree object.
:param mode: Mode to clean up.
"""
if stat.S_ISLNK(mode):
return stat.S_IFLNK
elif stat.S_ISDIR(mode):
return stat.S_IFDIR
elif S_ISGITLINK(mode):
return S_IFGITLINK
ret = stat.S_IFREG | 0o644
ret |= (mode & 0o111)
return ret
def getattr(self, path, fh=None):
if path == '/' or GDBFS.is_dir(path):
st = dict(st_mode=(S_IFDIR | 0755), st_nlink=2)
else:
# file = GDBFS.get_file(path)
if len(GDBFS.Ndb.get_node_label(GDBFS.get_node_name(path)))!=0:
filesize=0
try:
filesize=GDBFS.get_file_length(path)
except:
filesize=0
st = dict(st_mode=(S_IFREG | 0644), st_size=filesize)
else:
raise FuseOSError(ENOENT)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time()
st['st_uid'], st['st_gid'], pid = fuse_get_context()
print "\n\nin getattr, path =",path,"\nst =", st
return st
def _initialize_aci(self, mode, fileType):
valid_types = [
stat.S_IFREG,
stat.S_IFDIR,
stat.S_IFCHR,
stat.S_IFBLK,
stat.S_IFIFO,
stat.S_IFLNK,
stat.S_IFSOCK]
if fileType not in valid_types:
raise RuntimeError("Invalid file type.")
aci = self.aciCollection.new()
uid = os.getuid()
aci.dbobj.id = uid
aci.dbobj.uname = pwd.getpwuid(uid).pw_name
aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name
aci.dbobj.mode = int(mode, 8) + fileType
aci.save()
return aci.key
def getattr(self, path, fh=None):
'''
Returns a dictionary with keys identical to the stat C structure of
stat(2).
st_atime, st_mtime and st_ctime should be floats.
NOTE: There is an incombatibility between Linux and Mac OS X
concerning st_nlink of directories. Mac OS X counts all files inside
the directory, while Linux counts only the subdirectories.
'''
if path != '/':
raise FuseOSError(ENOENT)
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
def _list_directory(self, st):
return bool(st.st_mode & stat.S_IFDIR)
def test_ensure_tree(self):
tmpdir = tempfile.mkdtemp()
try:
testdir = '%s/foo/bar/baz' % (tmpdir,)
fileutils.ensure_tree(testdir, TEST_PERMISSIONS)
self.assertTrue(os.path.isdir(testdir))
self.assertEqual(os.stat(testdir).st_mode,
TEST_PERMISSIONS | stat.S_IFDIR)
finally:
if os.path.exists(tmpdir):
shutil.rmtree(tmpdir)
def find_data_to_stat(data):
"""Convert Win32 FIND_DATA struct to stat_result."""
# First convert Win32 dwFileAttributes to st_mode
attributes = data.dwFileAttributes
st_mode = 0
if attributes & FILE_ATTRIBUTE_DIRECTORY:
st_mode |= S_IFDIR | 0o111
else:
st_mode |= S_IFREG
if attributes & FILE_ATTRIBUTE_READONLY:
st_mode |= 0o444
else:
st_mode |= 0o666
if (attributes & FILE_ATTRIBUTE_REPARSE_POINT and
data.dwReserved0 == IO_REPARSE_TAG_SYMLINK):
st_mode ^= st_mode & 0o170000
st_mode |= S_IFLNK
st_size = data.nFileSizeHigh << 32 | data.nFileSizeLow
st_atime = filetime_to_time(data.ftLastAccessTime)
st_mtime = filetime_to_time(data.ftLastWriteTime)
st_ctime = filetime_to_time(data.ftCreationTime)
# Some fields set to zero per CPython's posixmodule.c: st_ino, st_dev,
# st_nlink, st_uid, st_gid
return Win32StatResult(st_mode, 0, 0, 0, 0, 0, st_size,
st_atime, st_mtime, st_ctime,
int(st_atime * 1000000000),
int(st_mtime * 1000000000),
int(st_ctime * 1000000000),
attributes)
def is_dir(self, follow_symlinks=True):
is_symlink = self.is_symlink()
if follow_symlinks and is_symlink:
try:
return self.stat().st_mode & 0o170000 == S_IFDIR
except OSError as e:
if e.errno != ENOENT:
raise
return False
elif is_symlink:
return False
else:
return (self._find_data.dwFileAttributes &
FILE_ATTRIBUTE_DIRECTORY != 0)
def is_dir(self, follow_symlinks=True):
if (self._d_type == DT_UNKNOWN or
(follow_symlinks and self.is_symlink())):
try:
st = self.stat(follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno != ENOENT:
raise
return False
return st.st_mode & 0o170000 == S_IFDIR
else:
return self._d_type == DT_DIR
def stat(self) -> Stat:
# Return a directory with read and execute (traverse) permissions for
# everybody.
now = time.time()
return dict(st_mode=(S_IFDIR | 0o755),
st_nlink=2,
st_ctime=now,
st_mtime=now,
st_attime=now)
def lstat(self, path):
"""Get attributes of a file, directory, or symlink
This method queries the attributes of a file, directory,
or symlink. Unlike :meth:`stat`, this method should
return the attributes of a symlink itself rather than
the target of that link.
:param bytes path:
The path of the file, directory, or link to get attributes for
:returns: An :class:`SFTPAttrs` or an os.stat_result containing
the file attributes
:raises: :exc:`SFTPError` to return an error to the client
"""
try:
stat = self._vfs.stat(path).stat
except VFSFileNotFoundError:
raise SFTPError(FX_NO_SUCH_FILE, 'No such file')
mod = S_IFDIR if stat.type == Stat.DIRECTORY else S_IFREG
return SFTPAttrs(**{
'size': stat.size,
'uid': os.getuid(),
'gid': os.getgid(),
'permissions': mod | S_IRWXU | S_IRWXG | S_IRWXO,
'atime': stat.atime,
'mtime': stat.mtime
})
def _list_directory(self, st):
return bool(st.st_mode & stat.S_IFDIR)
def getattr(self, path, fh=None):
'''
Returns a dictionary with keys identical to the stat C structure of
stat(2).
st_atime, st_mtime and st_ctime should be floats.
NOTE: There is an incombatibility between Linux and Mac OS X
concerning st_nlink of directories. Mac OS X counts all files inside
the directory, while Linux counts only the subdirectories.
'''
if path != '/':
raise FuseOSError(ENOENT)
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
def _to_mode(attr):
m = 0
if (attr & FILE_ATTRIBUTE_DIRECTORY):
m |= stdstat.S_IFDIR | 0o111
else:
m |= stdstat.S_IFREG
if (attr & FILE_ATTRIBUTE_READONLY):
m |= 0o444
else:
m |= 0o666
return m
def dbpcs(mode):
if mode & stat.S_IFLNK == stat.S_IFLNK: return 's'
if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's'
if mode & stat.S_IFREG == stat.S_IFREG: return '-'
if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b'
if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd'
if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p'
if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c'
return '?'
def getattr(self, inode, ctx=None):
attrs = self.inodes.get(inode)
if attrs is None:
raise llfuse.FUSEError(errno.ENOENT) # FIXME
if attrs.get('type') == 'tree':
mode_filetype = stat.S_IFDIR
elif attrs.get('type') == 'blob':
mode_filetype = stat.S_IFREG
elif attrs.get('filetype') == 'link':
mode_filetype = stat.S_IFLNK
elif attrs.get('filetype') == 'fifo':
mode_filetype = stat.S_IFIFO
else:
raise llfuse.FUSEError(errno.ENOENT) # FIXME
entry = llfuse.EntryAttributes()
entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE)
if attrs.get('type') == 'blob' and 'ref' in attrs:
entry.st_size = self.storage.size(attrs['ref'])
else:
entry.st_size = 0
stamp = int(1438467123.985654 * 1e9)
entry.st_atime_ns = stamp
entry.st_ctime_ns = stamp
entry.st_mtime_ns = stamp
entry.st_gid = 0
entry.st_uid = 0
entry.st_ino = inode
return entry
def __init__(self, is_dir=False, is_file=False, size=0, ctime=time(), mtime=time(), atime=time()):
self.is_dir = is_dir
self.is_file = is_file
if self.is_dir:
self.attr = dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
if self.is_file:
self.attr = dict(st_mode=(S_IFREG | 0o755), st_nlink=1, st_size=size,
st_ctime=ctime, st_mtime=mtime, st_atime=atime)
self.attr["attrs"] = {}
def create_dir(self, dir_inode, name, *, mode=0):
if not stat.S_IFMT(mode):
mode |= stat.S_IFDIR
dir_inode.change_times()
_debug('create_dir %s 0x%x', name, mode)
return self._create(mode, dir_inode, name)
def getattr(self, path, fh=None):
first_dir = common.get_first_dir(path)
if first_dir in self.dirs:
return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh)
uid, gid, pid = fuse_get_context()
if path == '/' or path in self.dirs:
st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
elif path.lower() in self.files:
st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
else:
raise FuseOSError(errno.ENOENT)
return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None):
first_dir = common.get_first_dir(path)
if first_dir in self.dirs:
return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh)
uid, gid, pid = fuse_get_context()
if path == '/':
st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
elif path.lower() in self.files:
st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
else:
raise FuseOSError(errno.ENOENT)
return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None):
uid, gid, pid = fuse_get_context()
if path == '/':
st = {'st_mode': (stat.S_IFDIR | (0o555 if self.readonly else 0o777)), 'st_nlink': 2}
elif path.lower() in self.files:
st = {'st_mode': (stat.S_IFREG | (0o444 if (self.readonly or path.lower() == '/_nandinfo.txt') else 0o666)),
'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
else:
raise FuseOSError(errno.ENOENT)
return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None):
first_dir = common.get_first_dir(path)
if first_dir in self.dirs:
return self.dirs[first_dir].getattr(common.remove_first_dir(path), fh)
uid, gid, pid = fuse_get_context()
if path == '/' or path in self.dirs:
st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
elif path.lower() in self.files:
st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': self.files[path.lower()]['size'], 'st_nlink': 1}
else:
raise FuseOSError(errno.ENOENT)
return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}
def getattr(self, path, fh=None):
uid, gid, pid = fuse_get_context()
try:
item = self.romfs_reader.get_info_from_path(path)
except romfs.RomFSFileNotFoundException:
raise FuseOSError(errno.ENOENT)
if item.type == 'dir':
st = {'st_mode': (stat.S_IFDIR | 0o555), 'st_nlink': 2}
elif item.type == 'file':
st = {'st_mode': (stat.S_IFREG | 0o444), 'st_size': item.size, 'st_nlink': 1}
else:
# this won't happen unless I fucked up
raise FuseOSError(errno.ENOENT)
return {**st, **self.g_stat, 'st_uid': uid, 'st_gid': gid}