def get_random_image(img_dir):
"""Pick a random image file from a directory."""
current_wall = wallpaper.get()
current_wall = os.path.basename(current_wall)
file_types = (".png", ".jpg", ".jpeg", ".jpe", ".gif",
".PNG", ".JPG", ".JPEG", ".JPE", ".GIF")
images = [img for img in os.scandir(img_dir)
if img.name.endswith(file_types) and img.name != current_wall]
if not images:
print("image: No new images found (nothing to do), exiting...")
sys.exit(1)
return os.path.join(img_dir, random.choice(images).name)
python类scandir()的实例源码
def get_list(path):
for file in os.scandir(path):
stat = file.stat()
perms = filemode(stat.st_mode)
nlinks = stat.st_nlink
if not nlinks:
nlinks = 1
size = stat.st_size
try:
uname = pwd.getpwuid(stat.st_uid).pw_name
except:
uname = 'owner'
try:
gname = grp.getgrgid(stat.st_gid).gr_name
except:
gname = 'group'
mtime = time.gmtime(stat.st_mtime)
mtime = time.strftime("%b %d %H:%M", mtime)
mname = file.name
yield "{} {} {} {} {} {} {}".format(perms, nlinks, uname, gname, size, mtime, mname)
def test_get_mlsx():
path = os.path.join(os.getcwd(), "tests")
result = {}
temp = get_mlsx(path)
for f in temp:
f = f.split(";")
filename = f[3].strip()
result[filename] = {}
for i in range(0, 3):
name = f[i].split('=')[0]
result[filename][name] = f[i].split('=')[1]
files = os.scandir(path)
for file in files:
assert file.name in result.keys()
result_file = result[file.name]
stat = file.stat()
assert result_file['modify'] == str(time.strftime("%Y%m%d%H%M%S", time.gmtime(stat[8])))
assert result_file['size'] == str(stat[6])
assert result_file['type'] == "dir" if file.is_dir() else "file"
def __init__(self, dirName):
"""
Args:
dirName (string): directory where to load the corpus
"""
self.MAX_NUMBER_SUBDIR = 200
self.conversations = []
__dir = os.path.join(dirName, "dialogs")
number_subdir = 0
for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))):
if number_subdir == self.MAX_NUMBER_SUBDIR:
print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR))
return
if sub.is_dir():
number_subdir += 1
for f in os.scandir(sub.path):
if f.name.endswith(".tsv"):
self.conversations.append({"lines": self.loadLines(f.path)})
def _osu_files(path, recurse):
"""An iterator of ``.osu`` filepaths in a directory.
Parameters
----------
path : path-like
The directory to search in.
recurse : bool
Recursively search ``path``?
Yields
------
path : str
The path to a ``.osu`` file.
"""
if recurse:
for directory, _, filenames in os.walk(path):
for filename in filenames:
if filename.endswith('.osu'):
yield pathlib.Path(os.path.join(directory, filename))
else:
for entry in os.scandir(directory):
path = entry.path
if path.endswith('.osu'):
yield pathlib.Path(path)
def _iterdir(dirname, dironly):
if not dirname:
if isinstance(dirname, bytes):
dirname = bytes(os.curdir, 'ASCII')
else:
dirname = os.curdir
try:
it = scandir(dirname)
for entry in it:
try:
if not dironly or entry.is_dir():
yield entry.name
except OSError:
pass
except OSError:
return
# Recursively yields relative pathnames inside a literal directory.
def _discover_sprites(self):
plugin_path = offshoot.config["file_paths"]["plugins"]
sprites = dict()
sprite_path = f"{plugin_path}/{self.__class__.__name__}Plugin/files/data/sprites"
if os.path.isdir(sprite_path):
files = os.scandir(sprite_path)
for file in files:
if file.name.endswith(".png"):
sprite_name = "_".join(file.name.split("/")[-1].split("_")[:-1]).replace(".png", "").upper()
sprite_image_data = skimage.io.imread(f"{sprite_path}/{file.name}")
sprite_image_data = sprite_image_data[:, :, :3, np.newaxis]
if sprite_name not in sprites:
sprite = Sprite(sprite_name, image_data=sprite_image_data)
sprites[sprite_name] = sprite
else:
sprites[sprite_name].append_image_data(sprite_image_data)
return sprites
def reader(path, shuffle=True):
files = []
for img_file in os.scandir(path):
if img_file.name.lower().endswith('.jpg', ) and img_file.is_file():
files.append(img_file.path)
if shuffle:
# Shuffle the ordering of all image files in order to guarantee
# random ordering of the images with respect to label in the
# saved TFRecord files. Make the randomization repeatable.
shuffled_index = list(range(len(files)))
random.shuffle(files)
files = [files[i] for i in shuffled_index]
return files
def set_random_clitoris_texture():
"""Picks a random texture from the TEXTURE_DIRECTORY_PATH and assigns it to
the clitoris material.
"""
try:
texture_names = list(entry.name for entry in os.scandir(TEXTURE_DIRECTORY_PATH)
if not entry.name.startswith('.') and entry.is_file())
random_texture_name = choice(texture_names)
texture_image = bpy.data.images.load(os.path.join(TEXTURE_DIRECTORY_PATH,
random_texture_name))
CLIT_MATERIAL.node_tree.nodes[TEXTURE_NODE_NAME].image = texture_image
DEBUG_INFO["random_texture_name"] = random_texture_name
except FileNotFoundError:
print("Cloud not find the texture directory", file=sys.stderr)
except IndexError:
print("Texture directory is empty", file=sys.stderr)
def test_remote_directories_moved_to_trash(command):
"""Remote directories are moved to the trash.
Remote directories are moved to the trash if the local directory
contained only symlinks.
"""
with open("remote/letters/upper/A.txt", "w") as file:
file.write("A"*BLOCK_SIZE*5)
with open("remote/letters/upper/B.txt", "w") as file:
file.write("B"*BLOCK_SIZE*5)
command.main()
shutil.rmtree("local/letters/upper")
command.main()
remote_trash_names = [
entry.name for entry in os.scandir(command.remote_dir.trash_dir)]
assert "upper" in remote_trash_names
def check_excluded(
self, paths: Iterable[str], start_path: str) -> Set[str]:
"""Get the paths that have been excluded by each client.
Args:
paths: The paths to check.
start_path: The path of the directory to match globbing patterns
against.
Returns:
The subset of input paths that have been excluded by each client.
"""
pattern_files = []
for entry in os.scandir(self._exclude_dir):
pattern_files.append(ProfileExcludeFile(entry.path))
rm_files = set()
for path in paths:
for pattern_file in pattern_files:
if path not in pattern_file.all_matches(start_path):
break
else:
rm_files.add(path)
return rm_files
def relativeWalk(path, startPath = None):
if startPath == None: startPath = path
# strxfrm -> local aware sorting - https://docs.python.org/3/howto/sorting.html#odd-and-ends
for entry in sorted(os.scandir(path), key = lambda x: locale.strxfrm(x.name)):
try:
#print(entry.path, " ----- ", entry.name)
if entry.is_file():
yield os.path.relpath(entry.path, startPath), False
elif entry.is_dir():
yield os.path.relpath(entry.path, startPath), True
yield from relativeWalk(entry.path, startPath)
else:
logging.error("Encountered an object which is neither directory nor file: " + entry.path)
except OSError as e:
logging.error(e)
# Possible actions:
# copy (always from source to target),
# delete (always in target)
# hardlink (always from compare directory to target directory)
# rename (always in target) (2-variate) (only needed for move detection)
# hardlink2 (alway from compare directory to target directory) (2-variate) (only needed for move detection)
def gather_candidates(self, context):
candidates = []
for directory in context['__folders']:
if not os.access(directory, os.X_OK):
continue
base = os.path.basename(directory)
items = os.scandir(directory)
for item in items:
if item.name[0] == '.':
continue
candidates.append({
'word': item.name,
'abbr': '%-14s %-20s' % (base, item.name),
'source__root': item.path,
'source__mtime': item.stat().st_mtime
})
candidates = sorted(candidates, key=itemgetter('source__mtime'),
reverse=True)
return candidates
def gather_candidates(self, context):
root = context['__root']
candidates = []
items = os.scandir(root)
now = time.time()
for item in items:
if item.is_file():
mtime = item.stat().st_mtime
extname = os.path.splitext(item.name)[0]
candidates.append({
'word': '%s (%s)' % (extname, ago(now, mtime)),
'action__path': item.path,
'source_mtime': mtime
})
candidates = sorted(candidates, key=lambda item: item['source_mtime'],
reverse=True)
return candidates
def main():
results = arg_parser.parse_args()
if re.match('^.+\.sqlite$', results.database):
results.database = re.sub('^(.+)\.sqlite$', '\\1', results.database)
try:
json_filenames = results.file if results.file else \
[f.path for f in scandir() if f.is_file() and
os.path.splitext(f.path)[1].lower() == '.json']
if not json_filenames:
raise NoFilesProvidedError
except NoFilesProvidedError:
sys.exit(2)
else:
edb = EDataSQLDatabase(database=results.database,
verbose=results.verbose)
for f in [f for f in json_filenames if check_file(f)]:
edb.import_file(f)
def _scan(self, root, media_dirs=None):
i = 0
with os.scandir(root) as it:
for entry in it:
i += 1
if i > SPEED_LIMIT:
# this allows the event loop to update
await asyncio.sleep(SPEED_WAIT_SEC)
i = 0
if self.stopped or self.interrupted:
return
if media_dirs:
if entry.name in media_dirs:
await self._scan(entry.path, media_dirs=None)
elif not entry.name.startswith('.'):
if entry.is_dir(follow_symlinks=False):
await self._scan(entry.path, media_dirs=None)
elif entry.name.rpartition('.')[2] in VIDEO_FILES_EXT:
await self._refresh_video(entry.path)
def load_from_dir(self, dirname, recurse=False, **check_args):
checks = []
for entry in os.scandir(dirname):
if recurse and entry.is_dir():
checks.extend(
self.load_from_dir(entry.path, recurse, **check_args)
)
if (entry.name.startswith('.') or
not entry.name.endswith('.py') or
not entry.is_file()):
continue
checks.extend(self.load_from_file(entry.path, **check_args))
return checks
def create_arff(self):
"""
Create Arff file with a timestamp
:return: string name of file
"""
mdate = datetime.now().strftime('%d-%m-%Y_%H_%M_%S')
my_arff_file = os.path.join("./arff_file/", mdate + ".arff")
os.makedirs(os.path.dirname(my_arff_file), exist_ok=True) # create folder if it does not exist
with open(my_arff_file, mode='w', encoding='utf-8') as output:
output.write("@relation " + self.relation + "\n")
output.write("\n")
for key, value in self.attribute_list.items():
output.write("@attribute " + key + " numeric\n")
output.write("@attribute folders {%s}\n" % ', '.join('{}'.format(f.name) for f in os.scandir(self.folderpath) if f.is_dir()))
output.write("\n")
output.write("@data\n")
for key, counter_value in sorted(self.counterList.items()):
# populate data part of the file with count of each word
joker_key = os.path.split(os.path.split(key)[0])[1]
line = ', '.join('{}'.format(counter_value[w]) for w in self.search_list)
line += ', ' + joker_key
output.write(line + "\n")
return my_arff_file
def dir_is_empty(path):
"""
Check if the given directory is empty.
May raise a FileNotFoundError or a NotADirectoryError exception.
"""
for entry in os.scandir(path):
return False
return True
def file_set_readonly(path, enable, follow_symlinks=True, recursive=False):
"""Apply or remove the read-only property of a given file or directory."""
st_mode = os.stat(path, follow_symlinks=follow_symlinks).st_mode
new_attr = (
(st_mode | stat.S_IREAD) & ~stat.S_IWRITE if enable
else (st_mode | stat.S_IWRITE) & ~stat.S_IREAD)
if new_attr != st_mode:
os.chmod(path, new_attr)
if recursive and stat.S_ISDIR(st_mode):
for entry in os.scandir(path):
file_set_readonly(entry.path, enable, follow_symlinks, recursive)