def globscan(
input_paths, recursive=False, allow_wildcards=True,
include_dirs=True, include_hidden=False,
raise_not_found=True):
"""
:py:func:`glob.glob` then scan *input_paths* and return a list of
:py:class:`FsEntry` objects.
*input_paths* can be a single string or an iterable that contains paths
(wildcard are accepted unless *allow_wildcards* is false).
A ``FileNotFoundError`` exception will be raised if one of the specified
input paths is missing, unless the *raise_not_found* flag is false.
:py:class:`FsEntry` is compatible with Python's :py:class:`os.DirEntry`
class.
**CAUTION:** this function **is not** thread-safe as it installs a trap at
runtime (i.e. for :py:func:`glob._ishidden`). The ``glob`` module must not
be used concurrently to this function.
"""
return list(iglobscan(input_paths, recursive, allow_wildcards,
include_dirs, include_hidden, raise_not_found))
python类DirEntry()的实例源码
def __init__(self, direntry, basedir):
# os.DirEntry type may be nt.DirEntry or posix.DirEntry (as of 3.5.2),
# so a more flexible test has been preferred over a usual isinstance()
assert(direntry.__class__.__name__ == "DirEntry")
assert(isinstance(basedir, str))
self._direntry = direntry
self.basedir = basedir
self.relpath = os.path.normpath(direntry.path) # normpath() to remove "./" prefix
self.path = os.path.join(self.basedir, self.relpath)
def _is_hidden(entry, follow_symlinks=True):
# note: *entry* can be a DirEntry or a FsEntry
if entry.name[0] is ".":
return True
if _IS_WINDOWS:
attr = entry.stat(follow_symlinks=follow_symlinks).st_file_attributes
if attr & stat.FILE_ATTRIBUTE_HIDDEN:
return True
return False
def __init__(self, paths):
super().__init__()
self.watches = set()
for p in paths:
p = os.path.expanduser(p)
logger.debug('watch '+p)
self.watches.add(Repo.observer.schedule(self, p))
for f in os.scandir(p):
isinstance(f, os.DirEntry)
self.on_found(f.is_dir, f.path)
def _insert_sorted(self, item: os.DirEntry, sort_by: SortBy) -> None:
"""Internal method to insert every scanned item into the local `_items`
list on-the-fly by the given `sort_by` parameter.
:param item: DirEntry object from `_iter_items()` async iteration
within the async parallel scanning.
:type item: posix.DirEntry
:param sort_by: SortBy enum attribute
:type sort_by: SortBy
:rtype: None
"""
attrs = self._get_attributes(item)
# It is an empty folder, grab folder timestamps
if attrs['atime'] == 0 and attrs['mtime'] == 0 and attrs['ctime'] == 0:
stat = item.stat(follow_symlinks=False)
attrs['atime'] = int(stat.st_atime)
attrs['mtime'] = int(stat.st_mtime)
attrs['ctime'] = int(stat.st_ctime)
summary = {'name': os.path.relpath(item.path, self._root),
'size': attrs['size'],
'depth': attrs['depth'],
'num_of_files': attrs['num_of_files'],
'atime': attrs['atime'],
'mtime': attrs['mtime'],
'ctime': attrs['ctime']}
index = self._find_index(summary, sort_by)
self._total_size += summary['size']
self._items_len += 1
self._items.insert(index, summary)
def scantree(path):
# type: (str) -> os.DirEntry
"""Recursively scan a directory tree
:param str path: path to scan
:rtype: DirEntry
:return: DirEntry via generator
"""
for entry in scandir(path):
if entry.is_dir(follow_symlinks=True):
# due to python2 compat, cannot use yield from here
for t in scantree(entry.path):
yield t
else:
yield entry
def scantree(path):
"""Recursively scan a directory tree
:param str path: path to scan
:rtype: os.DirEntry
:return: DirEntry via generator
"""
for entry in os.scandir(path):
if entry.is_dir(follow_symlinks=False):
yield from scantree(entry.path)
else:
yield entry
def scan_tree(path: str):
"""Recursively scan a directory tree and yield an os.DirEntry object.
Args:
path: The path of the directory to scan.
Yields:
An os.DirEntry object for each file in the tree.
"""
for entry in os.scandir(path):
yield entry
if entry.is_dir(follow_symlinks=False):
yield from scan_tree(entry.path)
def _get_attributes(self, item: os.DirEntry) -> dict:
"""Parses entire item and subdirectories and returns:
* Total size in bytes
* Maximum folder depth of item
* Total number of files this item contains
* Access timestamp
* Modification timestamp
* Change timestamp
in the same order as tuple.
:param item: DirEntry object
:type item: posix.DirEntry
:return: Dictionary of {size, depth, num_of_files, atime, mtime, ctime}
:rtype: dict
"""
# it's a file or symlink, size is already on item stat
if not item.is_dir(follow_symlinks=False):
stat = item.stat(follow_symlinks=False)
return {'size': stat.st_size,
'depth': self._get_depth(item.path) - self._level,
'num_of_files': 1,
'atime': int(stat.st_atime),
'mtime': int(stat.st_mtime),
'ctime': int(stat.st_ctime)}
# It is a folder, recursive size check
else:
total_size = num_of_files = depth = 0
atime = mtime = ctime = 0
with os.scandir(item.path) as directory:
for i in directory:
attrs = self._get_attributes(i)
total_size += attrs['size']
num_of_files += attrs['num_of_files']
atime = max(atime, attrs['atime'])
mtime = max(mtime, attrs['mtime'])
ctime = max(ctime, attrs['ctime'])
depth = max(depth, attrs['depth'])
return {'size': total_size,
'depth': depth,
'num_of_files': num_of_files,
'atime': atime,
'mtime': mtime,
'ctime': ctime}