def __realpath(file, root, loop_cnt, assume_dir):
while os.path.islink(file) and len(file) >= len(root):
if loop_cnt == 0:
raise OSError(errno.ELOOP, file)
loop_cnt -= 1
target = os.path.normpath(os.readlink(file))
if not os.path.isabs(target):
tdir = os.path.dirname(file)
assert(__is_path_below(tdir, root))
else:
tdir = root
file = __realpath_rel(tdir, target, root, loop_cnt, assume_dir)
try:
is_dir = os.path.isdir(file)
except:
is_dir = false
return (file, is_dir)
python类ELOOP的实例源码
def open_image_file(root, path, flag, mode=None):
"""Safely open files that ensures that the path we'are accessing resides
within a specified image root.
'root' is a directory that the path must reside in.
"""
try:
return os.fdopen(os.open(path, flag|os.O_NOFOLLOW, mode))
except EnvironmentError as e:
if e.errno != errno.ELOOP:
# Access to protected member; pylint: disable=W0212
raise api_errors._convert_error(e)
# If it is a symbolic link, fall back to ar_open. ar_open interprets
# 'path' as relative to 'root', that is, 'root' will be prepended to
# 'path', so we need to call os.path.relpath here.
from pkg.altroot import ar_open
return os.fdopen(ar_open(root, os.path.relpath(path, root), flag, mode))
def realpath(file, root, use_physdir = True, loop_cnt = 100, assume_dir = False):
""" Returns the canonical path of 'file' with assuming a
toplevel 'root' directory. When 'use_physdir' is set, all
preceding path components of 'file' will be resolved first;
this flag should be set unless it is guaranteed that there is
no symlink in the path. When 'assume_dir' is not set, missing
path components will raise an ENOENT error"""
root = os.path.normpath(root)
file = os.path.normpath(file)
if not root.endswith(os.path.sep):
# letting root end with '/' makes some things easier
root = root + os.path.sep
if not __is_path_below(file, root):
raise OSError(errno.EINVAL, "file '%s' is not below root" % file)
try:
if use_physdir:
file = __realpath_rel(root, file[(len(root) - 1):], root, loop_cnt, assume_dir)
else:
file = __realpath(file, root, loop_cnt, assume_dir)[0]
except OSError as e:
if e.errno == errno.ELOOP:
# make ELOOP more readable; without catching it, there will
# be printed a backtrace with 100s of OSError exceptions
# else
raise OSError(errno.ELOOP,
"too much recursions while resolving '%s'; loop in '%s'" %
(file, e.strerror))
raise
return file
def read_from_rp(self, rp):
"""Set the extended attributes from an rpath"""
try:
attr_list = rp.conn.xattr.listxattr(encode(rp.path),
rp.issym())
except IOError, exc:
if exc[0] in (errno.EOPNOTSUPP, errno.EPERM, errno.ETXTBSY):
return # if not supported, consider empty
if exc[0] in (errno.EACCES, errno.ENOENT, errno.ELOOP):
log.Log("Warning: listattr(%s): %s" % (repr(rp.path), exc), 4)
return
raise
for attr in attr_list:
if attr.startswith('system.'):
# Do not preserve system extended attributes
continue
if not rp.isdir() and attr == 'com.apple.ResourceFork':
# Resource Fork handled elsewhere, except for directories
continue
try:
self.attr_dict[attr] = \
rp.conn.xattr.getxattr(encode(rp.path),
attr, rp.issym())
except IOError, exc:
# File probably modified while reading, just continue
if exc[0] == errno.ENODATA: continue
elif exc[0] == errno.ENOENT: break
# Handle bug in pyxattr < 0.2.2
elif exc[0] == errno.ERANGE: continue
else: raise
def filter_visited(curr_dir, subdirs, already_visited, follow_dirlinks, on_error):
"""Filter subdirs that have already been visited.
This is used to avoid loops in the search performed by os.walk() in
index_files_by_size.
curr_dir is the path of the current directory, as returned by os.walk().
subdirs is the list of subdirectories for the current directory, as
returned by os.walk().
already_visited is a set of tuples (st_dev, st_ino) of already
visited directories. This set will not be modified.
on error is a function f(OSError) -> None, to be called in case of
error.
Returns a tuple: the new (possibly filtered) subdirs list, and a new
set of already visited directories, now including the subdirs.
"""
filtered = []
to_visit = set()
_already_visited = already_visited.copy()
try:
# mark the current directory as visited, so we catch symlinks to it
# immediately instead of after one iteration of the directory loop
file_info = os.stat(curr_dir) if follow_dirlinks else os.lstat(curr_dir)
_already_visited.add((file_info.st_dev, file_info.st_ino))
except OSError as e:
on_error(e)
for subdir in subdirs:
full_path = os.path.join(curr_dir, subdir)
try:
file_info = os.stat(full_path) if follow_dirlinks else os.lstat(full_path)
except OSError as e:
on_error(e)
continue
if not follow_dirlinks and stat.S_ISLNK(file_info.st_mode):
# following links to dirs is disabled, ignore this one
continue
dev_inode = (file_info.st_dev, file_info.st_ino)
if dev_inode not in _already_visited:
filtered.append(subdir)
to_visit.add(dev_inode)
else:
on_error(OSError(errno.ELOOP, "directory loop detected", full_path))
return filtered, _already_visited.union(to_visit)