def filename_match(filename, patterns):
"""
Check if patterns contains a pattern that matches filename.
"""
# `dir/*` works but `dir/` does not
for index in range(len(patterns)):
if patterns[index][-1] == '/':
patterns[index] += '*'
# filename has a leading `/` which confuses fnmatch
filename = filename.lstrip('/')
# Pattern is a fnmatch compatible regex
if any(fnmatch.fnmatch(filename, pattern) for pattern in patterns):
return True
# Pattern is a simple name of file or directory (not caught by fnmatch)
for pattern in patterns:
if '/' not in pattern and pattern in filename.split('/'):
return True
return False
python类fnmatch()的实例源码
def clearpyc(root, patterns='*',single_level=False, yield_folders=False):
"""
root: ??¼
patterns ???????
single_level: ???¼??
yield_folders: ??¼??
"""
patterns = patterns.split(';')
for path, subdirs, files in os.walk(root):
if yield_folders:
files.extend(subdirs)
files.sort()
for name in files:
for pattern in patterns:
if fnmatch.fnmatch(name, pattern.strip()):# ?pattern???
yield os.path.join(path, name)
if single_level:
break
def load_all_url_files(_dir, file_name_prefix):
url_list = []
for file_name in os.listdir(_dir):
if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'):
file_name = osp.join(_dir, file_name)
fp_urls = open(file_name, 'r') #Open the text file called database.txt
print 'load URLs from file: ' + file_name
i = 0
for line in fp_urls:
line = line.strip()
if len(line)>0:
splits = line.split('\t')
url_list.append(splits[0].strip())
i=i+1
print str(i) + ' URLs loaded'
fp_urls.close()
return url_list
########### End of Functions to Load downloaded urls ###########
############## Functions to get date/time strings ############
def get_grace_times(self, names):
labels = self.details['Config']['Labels']
if labels and labels.get("com.caduc.image.grace_time"):
return set([labels.get('com.caduc.image.grace_time', None)])
grace_config = self.config.get("images")
grace_times = set()
if grace_config:
for name in names:
for pattern, kv in six.iteritems(grace_config):
if fnmatch.fnmatch(name, pattern):
grace_time = kv['grace_time']
if grace_time is None or grace_time==-1:
grace_times.add(float('inf'))
else:
grace_times.add(kv['grace_time'])
if grace_times:
return grace_times
return set([self.grace_time])
def get_latest_ckpt(self):
"""Get the latest checkpoint filename in a folder."""
ckpt_fname_pattern = os.path.join(self.folder, self.fname + '.ckpt*')
ckpt_fname_list = []
for fn in os.listdir(self.folder):
fullname = os.path.join(self.folder, fn)
if fnmatch.fnmatch(fullname, ckpt_fname_pattern):
if not fullname.endswith('.meta'):
ckpt_fname_list.append(fullname)
if len(ckpt_fname_list) == 0:
raise Exception(
'No checkpoint file found {}'.format(ckpt_fname_pattern))
ckpt_fname_step = [int(fn.split('-')[-1]) for fn in ckpt_fname_list]
latest_step = max(ckpt_fname_step)
latest_ckpt = os.path.join(self.folder,
self.fname + '.ckpt-{}'.format(latest_step))
latest_graph = os.path.join(self.folder,
self.fname + '.ckpt-{}.meta'.format(latest_step))
return (latest_ckpt, latest_graph, latest_step)
def zone_list(module, base_url, zone=None):
''' Return list of existing zones '''
list = []
url = "{0}".format(base_url)
response, info = fetch_url(module, url, headers=headers)
if info['status'] != 200:
module.fail_json(msg="failed to enumerate zones at %s: %s" % (url, info['msg']))
content = response.read()
data = json.loads(content)
for z in data:
if zone is None or fnmatch.fnmatch(z['name'], zone):
list.append({
'name' : z['name'],
'kind' : z['kind'].lower(),
'serial' : z['serial'],
})
return list
def convert_all_files_in_path(self, path):
if not os.path.exists(path):
print("'%s': Path doesn't exists. Skipping" % path)
return
count = 0
for filename in os.listdir(path):
full_path = os.path.join(path, filename)
only_name, ext = os.path.splitext(full_path)
cmd = None
pyfile = None
if fnmatch.fnmatch(filename, '*.ui'):
pyfile = '%s.py' % only_name
cmd = self.PYUIC
elif fnmatch.fnmatch(filename, '*.qrc'):
pyfile = '%s_rc.py' % only_name
cmd = self.PYRCC
if cmd and modified(full_path, pyfile):
cmd_string = '%s -o "%s" "%s"' % (cmd, pyfile, full_path)
os.system(cmd_string)
count += 1
print("'%s': %s converted %s files" % (path, time.ctime(time.time()), count))
def getScannerThread(i, filenameStr, mp3guessenc_bin, mediainfo_bin, fileinfo_dialog_update=None, cmd_timeout=300,debug_enabled=False,main_q=None,info_q=None):
threads = set()
if fnmatch.fnmatch(filenameStr, "*.mp3"):
# use mp3guessenc if available
if not mp3guessenc_bin == "":
threads.add(scanner_Thread(i, filenameStr, mp3guessenc_bin, "mp3guessenc", "-e", debug_enabled, info_q,
main_q, fileinfo_dialog_update, cmd_timeout))
elif not mediainfo_bin == "": # always use mediainfo
threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q,
fileinfo_dialog_update, cmd_timeout))
elif fnmatch.fnmatch(filenameStr, "*.flac") and not mediainfo_bin == "":
threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q,
fileinfo_dialog_update, cmd_timeout))
elif not mediainfo_bin == "": # default for all files is mediainfo
threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q,
fileinfo_dialog_update, cmd_timeout))
return threads
def installed_plugins(self):
"""List all plugins installed."""
from os import listdir
from fnmatch import fnmatch
import compiler
import inspect
files = listdir('Plugins')
try:
files.remove('mount.py')
files.remove('template.py')
except ValueError:
pass
plugins = {}
for element in files:
if fnmatch(element, '*.py') and not fnmatch(element, '_*'):
plug_doc = compiler.parseFile('Plugins/' + element).doc
plug_doc = inspect.cleandoc(plug_doc)
plugins[element[:-3]] = plug_doc # Remove .py)
return plugins
def match(item, patterns):
matched = not patterns
for pattern in filter(None, patterns):
negate = False
if pattern.startswith('-') or pattern.startswith('!'):
negate = True
pattern = pattern[1:]
if pattern.startswith('+'):
pattern = pattern[1:]
local_matched = fnmatch.fnmatch(item, pattern)
if negate:
matched = matched and not local_matched
else:
matched = matched or local_matched
return matched
def list(self, pattern):
res = self._EXTRACT_PATTERN.match(pattern)
if not res:
raise URIException(f"Unable to match {pattern},"
" please use 'organization[/repo_pattern]'")
org_name = res.group("org")
repo_matcher = res.group("repo") or "*"
try:
repos = self._gh.get_organization(org_name).get_repos()
except github.GithubException:
repos = self._gh.get_user(org_name).get_repos()
for repo in repos:
if not fnmatch.fnmatch(repo.name, repo_matcher):
continue
if self._clone_protocol == self.CloneProtocol.ssh:
yield Repo(name=repo.name, url=repo.ssh_url)
elif self._clone_protocol == self.CloneProtocol.https:
yield Repo(name=repo.name, url=repo.clone_url)
else:
raise RuntimeError(f"Invalid protocol selected: {self._clone_protocol}")
def match(value, dn, recurse):
import fnmatch
result = []
if recurse:
walk_entries = os.walk(dn)
else:
walk_entries = [(dn, [], os.listdir(dn))]
for walk_result in walk_entries:
for fn in walk_result[2]:
fn = os.path.relpath(os.path.join(walk_result[0], fn), dn)
accept = False
for token in value.split():
negate = token.startswith('-')
if negate:
token = token[1:]
if not fnmatch.fnmatch(fn, token):
continue
accept = not negate
if accept:
result.append(fn)
result.sort()
return result
def _build(inputs):
paths = []
for f in inputs:
if os.path.isdir(f):
# Walk through the directory and get all PDF files
# Credit: https://stackoverflow.com/a/2186565/4856091
for root, dirnames, filenames in os.walk(f):
for filename in fnmatch.filter(filenames, "*.pdf"):
paths.append(os.path.join(root, filename))
elif f.endswith(".pdf"):
paths.append(f)
else:
# Get the contents as list of files
_files = [line.strip() for line in open(f).readlines()]
_files = [_f for _f in _files
if (not _f.startswith("#")) and (_f != "")]
paths += _files
return paths
def copy_other(opts, flacdir, outdir):
if opts.verbose:
print('COPYING other files')
for dirpath, dirs, files in os.walk(flacdir, topdown=False):
for name in files:
if opts.nolog and fnmatch(name.lower(), '*.log'):
continue
if opts.nocue and fnmatch(name.lower(), '*.cue'):
continue
if opts.nodots and fnmatch(name.lower(), '^.'):
continue
if (not fnmatch(name.lower(), '*.flac')
and not fnmatch(name.lower(), '*.m3u')):
d = re.sub(re.escape(flacdir), outdir, dirpath)
if (os.path.exists(os.path.join(d, name))
and not opts.overwrite):
continue
if not os.path.exists(d):
os.makedirs(d)
shutil.copy(os.path.join(dirpath, name), d)
def remove(path, dest_root, dryrun=False, debug=False):
"""
Remove the specified file/directory using `rm -rf`, to clean
up the destination backup.
The specified path must locate under the `dest_root` for safety.
"""
if not fnmatch(path, dest_root+"/*"):
raise ValueError("Not allowed to remove file/directory "
"outside destination: %s" % path)
if not os.path.exists(path):
return
logger.info("Remove: %s" % path)
args = ["-r", "-f"]
if debug:
args += ["-v"]
cmd = ["rm"] + args + [path]
if not dryrun:
subprocess.check_call(cmd)
def fnmatch_lines_random(self, lines2):
"""Check lines exist in the output.
The argument is a list of lines which have to occur in the
output, in any order. Each line can contain glob whildcards.
"""
lines2 = self._getlines(lines2)
for line in lines2:
for x in self.lines:
if line == x or fnmatch(x, line):
self._log("matched: ", repr(line))
break
else:
self._log("line %r not found in output" % line)
raise ValueError(self._log_text)
def should_skip(filename, config, path='/'):
"""Returns True if the file should be skipped based on the passed in settings."""
for skip_path in config['skip']:
if posixpath.abspath(posixpath.join(path, filename)) == posixpath.abspath(skip_path.replace('\\', '/')):
return True
position = os.path.split(filename)
while position[1]:
if position[1] in config['skip']:
return True
position = os.path.split(position[0])
for glob in config['skip_glob']:
if fnmatch.fnmatch(filename, glob):
return True
return False
def get_bucket_info(user):
"""return an object that has 'bucket', 'endpoint_url',
'region'.
Only 'bucket' is mandatory in the response object.
"""
url = settings.UPLOAD_DEFAULT_URL
exceptions = settings.UPLOAD_URL_EXCEPTIONS
if user.email.lower() in exceptions:
# easy
exception = exceptions[user.email.lower()]
else:
# match against every possible wildcard
exception = None # assume no match
for email_or_wildcard in settings.UPLOAD_URL_EXCEPTIONS:
if fnmatch.fnmatch(user.email.lower(), email_or_wildcard.lower()):
# a match!
exception = settings.UPLOAD_URL_EXCEPTIONS[
email_or_wildcard
]
break
if exception:
url = exception
return S3Bucket(url)
def list(self, labelmatch="*"):
'''
run "launchctl list" and return list of jobs that match the shell-
style wildcard <labelmatch>
since all jobs have our app-specific prefix, and have their
run time appended to their names, caller should terminate
<labelmatch> with ".*" unless they already have an exact name.
'''
results = []
try:
output = subprocess.check_output(["sudo", "launchctl", "list"])
for line in output.split("\n"):
if len(line) == 0:
break
fields= line.split(None,3)
if len(fields) < 3:
self.bomb("unexpected output from 'sudo launchctl list: %s'" % line)
job= fields[2]
if fnmatch(job, self.prefix + labelmatch):
results.append(job[len(self.prefix):])
except (subprocess.CalledProcessError) as error:
self.bomb("running 'sudo launchctl list'", error)
return results
def get_mode(self, paths):
"""Return ``SINGLE_END`` or ``PAIRED_END`` from list of file paths
Raise InputDataException if non-existing.
"""
seen = {0: 0, 1: 0} # counters
for path in paths:
filename = os.path.basename(path)
for i, patterns in enumerate([PATTERNS_R1, PATTERNS_R2]):
for pattern in patterns:
if fnmatch.fnmatch(filename, pattern):
seen[i] += 1
break
if not seen[0] and seen[1]:
raise InvalidDataException('Have seen only R2 in {}!'.format(
','.join(paths)))
if seen[1] and seen[0] != seen[1]:
raise InvalidDataException(
'Have seen different number of R1 and R2 reads in {}'.format(
','.join(paths)))
return (PAIRED_END if seen[1] else SINGLE_END)
def commands(connection, value=None):
names = []
for command in cmdlist:
command_func = cmdlist[command]
if (hasattr(command_func, 'user_types')
and command not in connection.rights):
continue
include = False
if (value is None or fnmatch.fnmatch(command, value)):
include = True
aliases = []
for a in aliaslist:
if aliaslist[a] == command:
if (value is None or fnmatch.fnmatch(a, value)):
include = True
aliases.append(a)
cmd = command if len(aliases) == 0 else (
'%s (%s)' % (command, ', '.join(aliases)))
if include:
names.append(cmd)
return 'Commands: %s' % (', '.join(names))
def select_host(self, host, host_names=None):
"""
checks that host is valid, i.e. in the list of glob host_names
if the host is missing, then is it selects the first entry from host_names
read more here: https://github.com/web2py/web2py/issues/1196
"""
if host:
if host_names:
for item in host_names:
if fnmatch.fnmatch(host, item):
break
else:
raise HTTP(403, "Invalid Hostname")
elif host_names:
host = host_names[0]
else:
host = 'localhost'
return host
def __getitem__(self, index):
while 1:
try:
file = self.files[self.index]
self.index = self.index + 1
except IndexError:
self.index = 0
self.directory = self.stack.pop()
self.files = os.listdir(self.directory)
else:
fullname = os.path.join(self.directory, file)
if os.path.isdir(fullname) and not os.path.islink(fullname):
self.stack.append(fullname)
if not (file.startswith('.') or file.startswith('#') or file.endswith('~')) \
and fnmatch.fnmatch(file, self.pattern):
return fullname
def find_analysis_barcode_file(parent_dir):
'''
Find the barcodes file for the analysis
# analysis_barcode_file="$(find "$analysis_outdir" -path "*variantCaller_out*" -name "sample_barcode_IDs.tsv" | head -1)"
'''
import os
import fnmatch
analysis_barcode_file = None
for root, dirs, files in os.walk(parent_dir):
if fnmatch.fnmatch(root, "*variantCaller_out*"):
for file in files:
if fnmatch.fnmatch(file, "sample_barcode_IDs.tsv"):
analysis_barcode_file = os.path.join(root, file)
break
# make sure file exists
if analysis_barcode_file == None:
print("ERROR: Analysis barcode file not found")
pl.file_exists(analysis_barcode_file, kill = True)
return(analysis_barcode_file)
def find_analysis_barcode_file(parent_dir):
'''
Find the barcodes file for the analysis
# analysis_barcode_file="$(find "$analysis_outdir" -path "*variantCaller_out*" -name "sample_barcode_IDs.tsv" | head -1)"
'''
import os
import fnmatch
analysis_barcode_file = None
for root, dirs, files in os.walk(parent_dir):
if fnmatch.fnmatch(root, "*variantCaller_out*"):
for file in files:
if fnmatch.fnmatch(file, "sample_barcode_IDs.tsv"):
analysis_barcode_file = os.path.join(root, file)
break
# make sure file exists
if analysis_barcode_file == None:
print("ERROR: Analysis barcode file not found")
pl.file_exists(analysis_barcode_file, kill = True)
return(analysis_barcode_file)
def included_in_wildcard(self, names, target_name):
"""Is target_name covered by a wildcard?
:param names: server aliases
:type names: `collections.Iterable` of `str`
:param str target_name: name to compare with wildcards
:returns: True if target_name is covered by a wildcard,
otherwise, False
:rtype: bool
"""
# use lowercase strings because fnmatch can be case sensitive
target_name = target_name.lower()
for name in names:
name = name.lower()
# fnmatch treats "[seq]" specially and [ or ] characters aren't
# valid in Apache but Apache doesn't error out if they are present
if "[" not in name and fnmatch.fnmatch(target_name, name):
return True
return False
def read_annotated(dir_path, patterns=["*.jpg", "*.png", "*.jpeg"]):
"""
Read annotated images from a directory. This reader assumes that the images in this directory are separated in
different directories with the label name as directory name. The method returns a generator of the label (string)
, the opencv image and the filename.
:param dir_path: The base directory we are going to read
:param patterns: Patterns of the images the reader should match
"""
for label in os.listdir(dir_path):
for root, dirs, files in os.walk(os.path.join(dir_path, label)):
for basename in files:
for pattern in patterns:
if fnmatch.fnmatch(basename, pattern):
filename = os.path.join(root, basename)
image = cv2.imread(filename)
if image is None:
print ">> Ignore empty image {f}".format(f=filename)
else:
yield label, image, filename
def _load_sensors(config):
"""Loads all the sensors from the specified config file.
Args:
config (ConfigParser): instance from which to extract the sensor list.
`str` is also allowed, in which case it
should be the path to the config file to load.
"""
global _sensors, _sensors_parsed
if not _sensors_parsed:
parser = _get_parser(config)
if parser is not None:
#Now that we have the thread, we can add configuration for each of the
#sensors in the config file.
from fnmatch import fnmatch
for section in parser.sections():
if fnmatch(section, "sensor.*"):
name = section[len("sensor."):]
_sensors[name] = Sensor(None,name,**dict(parser.items(section)))
_sensors_parsed = True
def preprocess_data_and_labels_AAP(data_file_path, save_path):
def merge_folds(data_file_path, save_path):
# merge all the separated folds into one file
train = []
val = []
test = []
for file in os.listdir(data_file_path):
if fnmatch.fnmatch(file, '*train.txt'):
train += (open(data_file_path + '/' + file, 'r').readlines())
elif fnmatch.fnmatch(file, '*validation.txt'):
val += (open(data_file_path + '/' + file, 'r').readlines())
else:
test += (open(data_file_path + '/' + file, 'r').readlines())
open(save_path + '/train.txt', 'w').write(''.join(train))
open(save_path + '/val.txt', 'w').write(''.join(val))
open(save_path + '/test.txt', 'w').write(''.join(test))
print len(train+val+test)
merge_folds(data_file_path, save_path)
def _match_pattern(self, pattern, file_info):
file_status = None
file_path = file_info.src
pattern_type = pattern[0]
if file_info.src_type == 'local':
path_pattern = pattern[1].replace('/', os.sep)
else:
path_pattern = pattern[1].replace(os.sep, '/')
is_match = fnmatch.fnmatch(file_path, path_pattern)
if is_match and pattern_type == 'include':
file_status = (file_info, True)
LOG.debug("%s matched include filter: %s",
file_path, path_pattern)
elif is_match and pattern_type == 'exclude':
file_status = (file_info, False)
LOG.debug("%s matched exclude filter: %s",
file_path, path_pattern)
else:
LOG.debug("%s did not match %s filter: %s",
file_path, pattern_type[2:], path_pattern)
return file_status