def getattr(self, path, fh=None):
try:
node = self.__path_to_object(path)
if node:
is_file = (type(node) == File)
st_mode_ft_bits = stat.S_IFREG if is_file else stat.S_IFDIR
st_mode_permissions = 0o444 if is_file else 0o555
return {
'st_mode': st_mode_ft_bits | st_mode_permissions,
'st_uid': self.uid,
'st_gid': self.gid,
'st_size': node.size if is_file else 0,
'st_ctime': node.time if is_file else 0,
'st_mtime': node.time if is_file else 0
}
else:
raise FuseOSError(ENOENT)
except IliasFSError as e:
raise_ilias_error_to_fuse(e)
python类S_IFREG的实例源码
def get_file_type(path):
"""Retrieve the file type of the path
:param path: The path to get the file type for
:return: The file type as a string or None on error
"""
f_types = {
'socket': stat.S_IFSOCK,
'regular': stat.S_IFREG,
'block': stat.S_IFBLK,
'directory': stat.S_IFDIR,
'character_device': stat.S_IFCHR,
'fifo': stat.S_IFIFO,
}
if not path or not os.path.exists(path):
return None
obj = os.stat(path).st_mode
for key,val in f_types.items():
if obj & val == val:
return key
def getattr(self, path):
print "getattr", path
st = MyStat()
st.st_atime = int(time.time())
st.st_mtime = st.st_atime
st.st_ctime = st.st_atime
pe = path.split(os.path.sep)[1:]
if path == "/": # root of the FUSE filesystem
pass
elif pe[-1] in ["a", "b", "c"]: # first level is directories
pass
elif len(pe) == 2 and pe[-1] == "fuse_live.mkv":
st.st_mode = stat.S_IFREG | 0666
st.st_nlink = 1
st.st_size = 1024**3 #self.fileSize
else:
return -errno.ENOENT
return st
def getattr(self, path):
# The path represents a pid.
components = os.path.split(path)
if len(components) > 2:
return
if len(components) == 2 and components[1] in self.tasks:
s = make_stat(components[1])
s.st_mode = stat.S_IFREG
s.st_size = self.address_space_size
return s
elif components[0] == "/":
s = make_stat(2)
s.st_mode = stat.S_IFDIR
return s
def test_committing_remembers_file_removal(self):
gen_id = self.create_generation()
self.repo.add_file(gen_id, '/foo/bar')
self.repo.set_file_key(
gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG)
self.repo.commit_client('fooclient')
self.repo.unlock_client('fooclient')
self.repo.lock_client('fooclient')
gen_id_2 = self.repo.create_generation('fooclient')
self.assertTrue(self.repo.file_exists(gen_id_2, '/foo/bar'))
self.repo.remove_file(gen_id_2, '/foo/bar')
self.repo.commit_client('fooclient')
self.repo.unlock_client('fooclient')
self.assertTrue(self.repo.file_exists(gen_id, '/foo/bar'))
self.assertFalse(self.repo.file_exists(gen_id_2, '/foo/bar'))
def test_unlocking_client_forgets_modified_file_chunk_ids(self):
gen_id = self.create_generation()
self.repo.add_file(gen_id, '/foo/bar')
self.repo.set_file_key(
gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG)
self.repo.append_file_chunk_id(gen_id, '/foo/bar', 1)
self.repo.commit_client('fooclient')
self.repo.unlock_client('fooclient')
self.repo.lock_client('fooclient')
gen_id_2 = self.repo.create_generation('fooclient')
self.repo.append_file_chunk_id(gen_id_2, '/foo/bar', 2)
self.assertEqual(
self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'),
[1, 2])
self.repo.unlock_client('fooclient')
self.assertEqual(
self.repo.get_file_chunk_ids(gen_id, '/foo/bar'),
[1])
def test_committing_child_remembers_modified_file_chunk_ids(self):
gen_id = self.create_generation()
self.repo.add_file(gen_id, '/foo/bar')
self.repo.set_file_key(
gen_id, '/foo/bar', obnamlib.REPO_FILE_MODE, stat.S_IFREG)
self.repo.append_file_chunk_id(gen_id, '/foo/bar', 1)
self.repo.commit_client('fooclient')
self.repo.unlock_client('fooclient')
self.repo.lock_client('fooclient')
gen_id_2 = self.repo.create_generation('fooclient')
self.repo.append_file_chunk_id(gen_id_2, '/foo/bar', 2)
self.assertEqual(
self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'),
[1, 2])
self.repo.commit_client('fooclient')
self.repo.unlock_client('fooclient')
self.assertEqual(
self.repo.get_file_chunk_ids(gen_id, '/foo/bar'),
[1])
self.assertEqual(
self.repo.get_file_chunk_ids(gen_id_2, '/foo/bar'),
[1, 2])
def setUp(self):
self.now = None
self.tempdir = tempfile.mkdtemp()
fs = obnamlib.LocalFS(self.tempdir)
self.hooks = obnamlib.HookManager()
self.hooks.new('repository-toplevel-init')
self.client = obnamlib.ClientMetadataTree(
fs, 'clientid', obnamlib.DEFAULT_NODE_SIZE,
obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE,
self)
# Force use of filename hash collisions.
self.client.default_file_id = self.client._bad_default_file_id
self.client.start_generation()
self.clientid = self.client.get_generation_id(self.client.tree)
self.file_metadata = obnamlib.Metadata(st_mode=stat.S_IFREG | 0666)
self.file_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata(
self.file_metadata)
self.dir_metadata = obnamlib.Metadata(st_mode=stat.S_IFDIR | 0777)
self.dir_encoded = obnamlib.fmt_6.metadata_codec.encode_metadata(
self.dir_metadata)
def make_file_struct(self, size, isfile=True, ctime=time(), mtime=time(), atime=time(), read_only=False):
stats = dict()
# TODO replace uncommented modes with commented when write ability is added
if isfile:
stats['st_mode'] = S_IFREG | 0o0444
else:
stats['st_mode'] = S_IFDIR | 0o0555
if not self.pakfile.read_only:
stats['st_mode'] |= 0o0200
stats['st_uid'] = os.getuid()
stats['st_gid'] = os.getgid()
stats['st_nlink'] = 1
stats['st_ctime'] = ctime
stats['st_mtime'] = mtime
stats['st_atime'] = atime
stats['st_size'] = size
return stats
def test_atomic_write_preserves_ownership_before_moving_into_place(self):
atomic_file = self.make_file('atomic')
self.patch(fs_module, 'isfile').return_value = True
self.patch(fs_module, 'chown')
self.patch(fs_module, 'rename')
self.patch(fs_module, 'stat')
ret_stat = fs_module.stat.return_value
ret_stat.st_uid = sentinel.uid
ret_stat.st_gid = sentinel.gid
ret_stat.st_mode = stat.S_IFREG
atomic_write(factory.make_bytes(), atomic_file)
self.assertThat(fs_module.stat, MockCalledOnceWith(atomic_file))
self.assertThat(fs_module.chown, MockCalledOnceWith(
ANY, sentinel.uid, sentinel.gid))
def cleanup_mode(mode):
"""Cleanup a mode value.
This will return a mode that can be stored in a tree object.
:param mode: Mode to clean up.
"""
if stat.S_ISLNK(mode):
return stat.S_IFLNK
elif stat.S_ISDIR(mode):
return stat.S_IFDIR
elif S_ISGITLINK(mode):
return S_IFGITLINK
ret = stat.S_IFREG | 0o644
ret |= (mode & 0o111)
return ret
def getattr(self, path, fh=None):
if path == '/' or GDBFS.is_dir(path):
st = dict(st_mode=(S_IFDIR | 0755), st_nlink=2)
else:
# file = GDBFS.get_file(path)
if len(GDBFS.Ndb.get_node_label(GDBFS.get_node_name(path)))!=0:
filesize=0
try:
filesize=GDBFS.get_file_length(path)
except:
filesize=0
st = dict(st_mode=(S_IFREG | 0644), st_size=filesize)
else:
raise FuseOSError(ENOENT)
st['st_ctime'] = st['st_mtime'] = st['st_atime'] = time()
st['st_uid'], st['st_gid'], pid = fuse_get_context()
print "\n\nin getattr, path =",path,"\nst =", st
return st
def addFile(self, parent_path, name, size=0, mode="644"):
_, parentObj = self._search_db(parent_path)
if parentObj.dbobj.state != "":
raise RuntimeError("%s: No such file or directory" % parent_path)
aciKey = self._initialize_aci(mode, stat.S_IFREG)
fileDict = {
"aclkey": aciKey,
"blocksize": size,
"creationTime": calendar.timegm(time.gmtime()),
"modificationTime": calendar.timegm(time.gmtime()),
"name": name,
"size": size,
}
self._addObj(parentObj, fileDict, "F")
parentObj.save()
def _initialize_aci(self, mode, fileType):
valid_types = [
stat.S_IFREG,
stat.S_IFDIR,
stat.S_IFCHR,
stat.S_IFBLK,
stat.S_IFIFO,
stat.S_IFLNK,
stat.S_IFSOCK]
if fileType not in valid_types:
raise RuntimeError("Invalid file type.")
aci = self.aciCollection.new()
uid = os.getuid()
aci.dbobj.id = uid
aci.dbobj.uname = pwd.getpwuid(uid).pw_name
aci.dbobj.gname = grp.getgrgid(os.getgid()).gr_name
aci.dbobj.mode = int(mode, 8) + fileType
aci.save()
return aci.key
def create_shelf(self, shelf):
""" Create one new shelf in the database.
Input---
shelf_data - list of shelf data to insert
Output---
shelf_data or error message
"""
shelf.id = None # DB engine will autochoose next id
if not shelf.mode:
shelf.mode = stat.S_IFREG + 0o666
tmp = int(time.time())
shelf.ctime = shelf.mtime = tmp
if shelf.parent_id == 0:
shelf.parent_id = 2 # is now a default for if it given to go in root
shelf.id = self._cur.INSERT('shelves', shelf.tuple())
return shelf
def cmp(f1, f2, shallow=1):
"""Compare two files.
Arguments:
f1 -- First file name
f2 -- Second file name
shallow -- Just check stat signature (do not read the files).
defaults to 1.
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with a cache invalidation mechanism relying on stale signatures.
"""
s1 = _sig(os.stat(f1))
s2 = _sig(os.stat(f2))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if shallow and s1 == s2:
return True
if s1[1] != s2[1]:
return False
result = _cache.get((f1, f2))
if result and (s1, s2) == result[:2]:
return result[2]
outcome = _do_cmp(f1, f2)
_cache[f1, f2] = s1, s2, outcome
return outcome
def cmp(f1, f2, shallow=1):
"""Compare two files.
Arguments:
f1 -- First file name
f2 -- Second file name
shallow -- Just check stat signature (do not read the files).
defaults to 1.
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with a cache invalidation mechanism relying on stale signatures.
"""
s1 = _sig(os.stat(f1))
s2 = _sig(os.stat(f2))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if shallow and s1 == s2:
return True
if s1[1] != s2[1]:
return False
outcome = _cache.get((f1, f2, s1, s2))
if outcome is None:
outcome = _do_cmp(f1, f2)
if len(_cache) > 100: # limit the maximum size of the cache
_cache.clear()
_cache[f1, f2, s1, s2] = outcome
return outcome
def _setup_logging_from_conf(product_name):
log_root = getLogger(None).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
if CONF.use_syslog:
facility = _find_facility_from_conf()
syslog = logging.handlers.SysLogHandler(address='/dev/log',
facility=facility)
log_root.addHandler(syslog)
logpath = _get_log_file_path()
if logpath:
filelog = logging.handlers.WatchedFileHandler(logpath)
log_root.addHandler(filelog)
mode = int('0644', 8)
st = os.stat(logpath)
if st.st_mode != (stat.S_IFREG | mode):
os.chmod(logpath, mode)
if CONF.use_stderr:
streamlog = ColorHandler()
log_root.addHandler(streamlog)
elif not CONF.log_file:
# pass sys.stdout as a positional argument
# python2.6 calls the argument strm, in 2.7 it's stream
streamlog = logging.StreamHandler(sys.stdout)
log_root.addHandler(streamlog)
for handler in log_root.handlers:
datefmt = CONF.log_date_format
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug:
log_root.setLevel(logging.DEBUG)
else:
log_root.setLevel(logging.INFO)
def _setup_logging_from_flags():
ops_root = getLogger().logger
for handler in ops_root.handlers:
ops_root.removeHandler(handler)
logpath = _get_log_file_path()
if logpath:
filelog = logging.handlers.WatchedFileHandler(logpath)
ops_root.addHandler(filelog)
mode = int(options.logfile_mode, 8)
st = os.stat(logpath)
if st.st_mode != (stat.S_IFREG | mode):
os.chmod(logpath, mode)
for handler in ops_root.handlers:
handler.setFormatter(logging.Formatter(fmt=options.log_format,
datefmt=options.log_date_format))
if options.verbose or options.debug:
ops_root.setLevel(logging.DEBUG)
else:
ops_root.setLevel(logging.INFO)
root = logging.getLogger()
for handler in root.handlers:
root.removeHandler(handler)
handler = NullHandler()
handler.setFormatter(logging.Formatter())
root.addHandler(handler)
def stat(self) -> Stat:
# Return a regular file with read permissions to everybody and ZERO
# length!
now = time.time()
return dict(st_mode=(S_IFREG | 0o644),
st_nlink=1,
st_ctime=now,
st_mtime=now,
st_attime=now)
def cmp(f1, f2, shallow=True):
"""Compare two files.
Arguments:
f1 -- First file name
f2 -- Second file name
shallow -- Just check stat signature (do not read the files).
defaults to 1.
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with a cache invalidation mechanism relying on stale signatures.
"""
s1 = _sig(os.stat(f1))
s2 = _sig(os.stat(f2))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if shallow and s1 == s2:
return True
if s1[1] != s2[1]:
return False
outcome = _cache.get((f1, f2, s1, s2))
if outcome is None:
outcome = _do_cmp(f1, f2)
if len(_cache) > 100: # limit the maximum size of the cache
_cache.clear()
_cache[f1, f2, s1, s2] = outcome
return outcome
def test_mode(self):
with open(TESTFN, 'w'):
pass
if os.name == 'posix':
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXU)
os.chmod(TESTFN, 0o070)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXG)
os.chmod(TESTFN, 0o007)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXO)
os.chmod(TESTFN, 0o444)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode), 0o444)
else:
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IFMT(st_mode),
stat.S_IFREG)
def cmp(f1, f2, shallow=1):
"""Compare two files.
Arguments:
f1 -- First file name
f2 -- Second file name
shallow -- Just check stat signature (do not read the files).
defaults to 1.
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with a cache invalidation mechanism relying on stale signatures.
"""
s1 = _sig(os.stat(f1))
s2 = _sig(os.stat(f2))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if shallow and s1 == s2:
return True
if s1[1] != s2[1]:
return False
outcome = _cache.get((f1, f2, s1, s2))
if outcome is None:
outcome = _do_cmp(f1, f2)
if len(_cache) > 100: # limit the maximum size of the cache
_cache.clear()
_cache[f1, f2, s1, s2] = outcome
return outcome
def test_mode(self):
with open(TESTFN, 'w'):
pass
if os.name == 'posix':
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXU)
os.chmod(TESTFN, 0o070)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXG)
os.chmod(TESTFN, 0o007)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXO)
os.chmod(TESTFN, 0o444)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode), 0o444)
else:
os.chmod(TESTFN, 0o700)
st_mode = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IFMT(st_mode),
stat.S_IFREG)
def cmp(f1, f2, shallow=1):
"""Compare two files.
Arguments:
f1 -- First file name
f2 -- Second file name
shallow -- Just check stat signature (do not read the files).
defaults to 1.
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with a cache invalidation mechanism relying on stale signatures.
"""
s1 = _sig(os.stat(f1))
s2 = _sig(os.stat(f2))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if shallow and s1 == s2:
return True
if s1[1] != s2[1]:
return False
outcome = _cache.get((f1, f2, s1, s2))
if outcome is None:
outcome = _do_cmp(f1, f2)
if len(_cache) > 100: # limit the maximum size of the cache
_cache.clear()
_cache[f1, f2, s1, s2] = outcome
return outcome
def cmp(f1, f2, shallow=1):
"""Compare two files.
Arguments:
f1 -- First file name
f2 -- Second file name
shallow -- Just check stat signature (do not read the files).
defaults to 1.
Return value:
True if the files are the same, False otherwise.
This function uses a cache for past comparisons and the results,
with a cache invalidation mechanism relying on stale signatures.
"""
s1 = _sig(os.stat(f1))
s2 = _sig(os.stat(f2))
if s1[0] != stat.S_IFREG or s2[0] != stat.S_IFREG:
return False
if shallow and s1 == s2:
return True
if s1[1] != s2[1]:
return False
outcome = _cache.get((f1, f2, s1, s2))
if outcome is None:
outcome = _do_cmp(f1, f2)
if len(_cache) > 100: # limit the maximum size of the cache
_cache.clear()
_cache[f1, f2, s1, s2] = outcome
return outcome
def test_mode(self):
with open(TESTFN, 'w'):
pass
if os.name == 'posix':
os.chmod(TESTFN, 0o700)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr, '-rwx------')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXU)
os.chmod(TESTFN, 0o070)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr, '----rwx---')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXG)
os.chmod(TESTFN, 0o007)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr, '-------rwx')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IMODE(st_mode),
stat.S_IRWXO)
os.chmod(TESTFN, 0o444)
st_mode, modestr = self.get_mode()
self.assertS_IS("REG", st_mode)
self.assertEqual(modestr, '-r--r--r--')
self.assertEqual(stat.S_IMODE(st_mode), 0o444)
else:
os.chmod(TESTFN, 0o700)
st_mode, modestr = self.get_mode()
self.assertEqual(modestr[:3], '-rw')
self.assertS_IS("REG", st_mode)
self.assertEqual(stat.S_IFMT(st_mode),
stat.S_IFREG)
def _to_mode(attr):
m = 0
if (attr & FILE_ATTRIBUTE_DIRECTORY):
m |= stdstat.S_IFDIR | 0o111
else:
m |= stdstat.S_IFREG
if (attr & FILE_ATTRIBUTE_READONLY):
m |= 0o444
else:
m |= 0o666
return m
def dbpcs(mode):
if mode & stat.S_IFLNK == stat.S_IFLNK: return 's'
if mode & stat.S_IFSOCK == stat.S_IFSOCK: return 's'
if mode & stat.S_IFREG == stat.S_IFREG: return '-'
if mode & stat.S_IFBLK == stat.S_IFBLK: return 'b'
if mode & stat.S_IFDIR == stat.S_IFDIR: return 'd'
if mode & stat.S_IFIFO == stat.S_IFIFO: return 'p'
if mode & stat.S_IFCHR == stat.S_IFCHR: return 'c'
return '?'
def getattr(self, inode, ctx=None):
attrs = self.inodes.get(inode)
if attrs is None:
raise llfuse.FUSEError(errno.ENOENT) # FIXME
if attrs.get('type') == 'tree':
mode_filetype = stat.S_IFDIR
elif attrs.get('type') == 'blob':
mode_filetype = stat.S_IFREG
elif attrs.get('filetype') == 'link':
mode_filetype = stat.S_IFLNK
elif attrs.get('filetype') == 'fifo':
mode_filetype = stat.S_IFIFO
else:
raise llfuse.FUSEError(errno.ENOENT) # FIXME
entry = llfuse.EntryAttributes()
entry.st_mode = mode_filetype | attrs.get('mode', MartyFSHandler.DEFAULT_MODE)
if attrs.get('type') == 'blob' and 'ref' in attrs:
entry.st_size = self.storage.size(attrs['ref'])
else:
entry.st_size = 0
stamp = int(1438467123.985654 * 1e9)
entry.st_atime_ns = stamp
entry.st_ctime_ns = stamp
entry.st_mtime_ns = stamp
entry.st_gid = 0
entry.st_uid = 0
entry.st_ino = inode
return entry