def rmdir(self, parent_inode, name, ctx):
raise llfuse.FUSEError(errno.EROFS)
python类EROFS的实例源码
def setattr(self, inode, attr, fields, fh, ctx):
raise llfuse.FUSEError(errno.EROFS)
def symlink(self, parent_inode, name, target, ctx):
raise llfuse.FUSEError(errno.EROFS)
def unlink(self, parent_inode, name, ctx):
raise llfuse.FUSEError(errno.EROFS)
def write(self, fh, off, buf):
raise llfuse.FUSEError(errno.EROFS)
def _lock_file(f, dotlock=True):
"""Lock file f using lockf and dot locking."""
dotlock_done = False
try:
if fcntl:
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError as e:
if e.errno in (errno.EAGAIN, errno.EACCES, errno.EROFS):
raise ExternalClashError('lockf: lock unavailable: %s' %
f.name)
else:
raise
if dotlock:
try:
pre_lock = _create_temporary(f.name + '.lock')
pre_lock.close()
except OSError as e:
if e.errno in (errno.EACCES, errno.EROFS):
return # Without write access, just skip dotlocking.
else:
raise
try:
if hasattr(os, 'link'):
os.link(pre_lock.name, f.name + '.lock')
dotlock_done = True
os.unlink(pre_lock.name)
else:
os.rename(pre_lock.name, f.name + '.lock')
dotlock_done = True
except FileExistsError:
os.remove(pre_lock.name)
raise ExternalClashError('dot lock unavailable: %s' %
f.name)
except:
if fcntl:
fcntl.lockf(f, fcntl.LOCK_UN)
if dotlock_done:
os.remove(f.name + '.lock')
raise
def create(self, path, mode, **kwargs):
if self.readonly:
raise FuseOSError(errno.EROFS)
return os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
def mknod(self, path, *args, **kwargs):
if self.readonly:
raise FuseOSError(errno.EROFS)
if not common.windows:
os.mknod(path, *args, **kwargs)
# open = os.open
def rename(self, old, new):
# TODO: proper rename support - this may not happen because there's not
# much reason to rename files here. copying might work since either
# way, the file[s] would have to be re-encrypted.
raise FuseOSError(errno.EROFS if self.readonly else errno.EPERM)
# if self.readonly:
# raise FuseOSError(errno.EROFS)
# return os.rename(old, self.root + new)
def write(self, path, data, offset, fh):
if self.readonly:
raise FuseOSError(errno.EROFS)
# special check for special files
if os.path.basename(path).startswith('.'):
if common.windows:
f = open(path, 'r+b', buffering=0)
f.seek(offset)
return f.write(data)
else:
with self.rwlock:
os.lseek(fh, offset, 0)
return os.write(fh, data)
before = offset % 16
iv = self.path_to_iv(path) + (offset >> 4)
out_data = self.crypto.aes_ctr(0x34, iv, (b'\0' * before) + data)[before:]
if common.windows:
with open(path, 'r+b', buffering=0) as f:
f.seek(offset - before)
written = f.write(out_data)
else:
with self.rwlock:
os.lseek(fh, offset, 0)
written = os.write(fh, out_data)
return written
def fake_open(filename, flags, mode=0777, _os_open=os.open):
"""Fake version of os.open."""
# A copy of os.open is saved in _os_open so it can still be used after os.open
# is replaced with this stub.
if flags & (os.O_RDWR | os.O_CREAT | os.O_WRONLY):
raise OSError(errno.EROFS, 'Read-only file system', filename)
elif not FakeFile.is_file_accessible(filename):
raise OSError(errno.ENOENT, 'No such file or directory', filename)
return _os_open(filename, flags, mode)
def __init__(self, filename, mode='r', bufsize=-1, **kwargs):
"""Initializer. See file built-in documentation."""
if mode not in FakeFile.ALLOWED_MODES:
raise IOError(errno.EROFS, 'Read-only file system', filename)
if not FakeFile.is_file_accessible(filename):
raise IOError(errno.EACCES, 'file not accessible', filename)
super(FakeFile, self).__init__(filename, mode, bufsize, **kwargs)
def test_fake_open_write(self):
self.mox.ReplayAll()
with self.assertRaises(OSError) as cm:
stubs.fake_open(__file__, os.O_RDWR)
self.mox.VerifyAll()
e = cm.exception
self.assertEqual(errno.EROFS, e.errno)
self.assertEqual('Read-only file system', e.strerror)
self.assertEqual(__file__, e.filename)
self.mox.VerifyAll()
def _lock_file(f, dotlock=True):
"""Lock file f using lockf and dot locking."""
dotlock_done = False
try:
if fcntl:
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError as e:
if e.errno in (errno.EAGAIN, errno.EACCES, errno.EROFS):
raise ExternalClashError('lockf: lock unavailable: %s' %
f.name)
else:
raise
if dotlock:
try:
pre_lock = _create_temporary(f.name + '.lock')
pre_lock.close()
except OSError as e:
if e.errno in (errno.EACCES, errno.EROFS):
return # Without write access, just skip dotlocking.
else:
raise
try:
if hasattr(os, 'link'):
os.link(pre_lock.name, f.name + '.lock')
dotlock_done = True
os.unlink(pre_lock.name)
else:
os.rename(pre_lock.name, f.name + '.lock')
dotlock_done = True
except FileExistsError:
os.remove(pre_lock.name)
raise ExternalClashError('dot lock unavailable: %s' %
f.name)
except:
if fcntl:
fcntl.lockf(f, fcntl.LOCK_UN)
if dotlock_done:
os.remove(f.name + '.lock')
raise
def readdir(self, path, fh):
node = self.tree.find_path(path)
if node is None:
raise FuseOSError(EROFS)
return ['.', '..'] + [child for child in node.children]
def chmod(self, path, mode):
raise IOError(errno.EROFS, 'Read only filesystem')
def chown(self, path, uid, gid):
raise IOError(errno.EROFS, 'Read only filesystem')
def link(self, targetPath, linkPath):
raise IOError(errno.EROFS, 'Read only filesystem')
def mkdir(self, path, mode):
raise IOError(errno.EROFS, 'Read only filesystem')
def rename(self, oldPath, newPath):
raise IOError(errno.EROFS, 'Read only filesystem')