def _fuzzdb_get_strings(max_len=0):
'Helper to get all the strings from fuzzdb'
ignored = ['integer-overflow']
for subdir in pkg_resources.resource_listdir('protofuzz', BASE_PATH):
if subdir in ignored:
continue
path = '{}/{}'.format(BASE_PATH, subdir)
listing = pkg_resources.resource_listdir('protofuzz', path)
for filename in listing:
if not filename.endswith('.txt'):
continue
path = '{}/{}/{}'.format(BASE_PATH, subdir, filename)
source = _open_fuzzdb_file(path)
for line in source:
string = line.decode('utf-8').strip()
if not string or string.startswith('#'):
continue
if max_len != 0 and len(line) > max_len:
continue
yield string
python类resource_listdir()的实例源码
def load_model(package_name, model_dir, model_name):
model_path = model_dir + "/" + model_name
model = {}
for f in pkg_resources.resource_listdir(package_name, model_path):
f = model_path + '/' + f
with pkg_resources.resource_stream(package_name, f) as fd:
append_model(model, yaml.safe_load(fd))
imports_path = model.get('imports')
if imports_path:
f = model_dir + '/' + imports_path
with pkg_resources.resource_stream(package_name, f) as fd:
append_model(model, yaml.safe_load(fd))
extend_base_objects(model)
extend_api_objects(model)
return model
# Singleton generator
def getTestList():
"""
Generate a list containing the names of the test modules, which should
be executed.
Returns: List with names of test Python modules (list/string).
"""
testModList = []
modulesInNgamsTest = pkg_resources.resource_listdir(__name__, ".")
fileList = [f for f in modulesInNgamsTest if f.endswith("Test.py")]
fileList.sort()
supprTests = []
for file in fileList:
testMod = os.path.basename(file).split(".")[0]
try:
supprTests.index(testMod)
print "===> NOTE: Test Suite: %-32s disabled - RE-ENABLE!" %\
testMod
continue
except:
pass
if (file.find("ngamsTest.py") == -1): testModList.append(testMod)
return testModList
def setup_output_path(self):
"""
Called on task startup to copy all static resources into the output
path (and to make sure the output path exists as a directory).
"""
self.logger.info('setting up output path')
try:
self.output_path.mkdir()
except FileExistsError:
pass
try:
(self.output_path / 'simple').mkdir()
except FileExistsError:
pass
for filename in resource_listdir(__name__, 'static'):
with (self.output_path / filename).open('wb') as f:
source = resource_stream(__name__, 'static/' + filename)
f.write(source.read())
source.close()
def get_model_list(package_name="gluon", model_dir="models"):
model_list = list()
for f in pkg_resources.resource_listdir(package_name, model_dir):
if f == 'base':
continue
model_list.append(f)
return model_list
def show_wordlists():
click.echo(_('builtin word lists:'))
choices = [res for res in pkg_resources.resource_listdir(
__name__, 'wordlists') if '.' not in res]
click.echo(' '.join(sorted(choices)))
def find_components(package, base_class):
"""Find components which are subclass of a given base class.
"""
for filename in resource_listdir(package, ''):
basename, extension = os.path.splitext(filename)
if extension != '.py' or basename.startswith('.'):
continue
module_name = "{}.{}".format(package, basename)
__import__(module_name, fromlist='*')
module = sys.modules[module_name]
if not hasattr(module, '__all__'):
continue
yield from scan_module(module, base_class)
def _get_configspec():
"""Found and read all the configuration specifications"""
files = sorted(pkg_resources.resource_listdir(__name__, ""))
specfiles = [fn for fn in files if fn.endswith(".conf.spec")]
if os.environ.get("DEBUG_FG21SIM"):
print("DEBUG: Found config specifications: %s" % ", ".join(specfiles),
file=sys.stderr)
# NOTE:
# `resource_string()` returns the resource in *binary/bytes* string
configspec = "\n".join([
pkg_resources.resource_string(__name__, fn).decode("utf-8")
for fn in specfiles
]).split("\n")
return configspec
def resource_walk(package_or_requirement, resource_name):
"""Generate the file names in a resource tree.
Parameters
----------
package_or_requirement : str or Requirement
Package or requirement that contains the resource.
resource_name : str
Name of the resource.
Returns
-------
tuple
For each directory in the tree rooted at the given resoruce a 3-tuple
``(dirpath, dirnames, filenames)`` is returned. *dirpath* is a string,
the path to the directory starting with *resource_name*. *dirnames* is
a list of the names of subdirectories in *dirpath*. *filenames* is a
list of names of non-directory files in *dirpath*.
"""
queue = [resource_name]
while len(queue) > 0:
dirpath = queue.pop()
dirnames = []
filenames = []
for name in resource_listdir(package_or_requirement, dirpath):
fullpath = os.path.join(dirpath, name)
if resource_isdir(package_or_requirement, fullpath):
dirnames.append(name)
queue.append(fullpath)
else:
filenames.append(name)
yield dirpath, dirnames, filenames
def send_extra(self):
"""
Sends any extra JS/CSS files placed in Gate One's 'static/extra'
directory. Can be useful if you want to use Gate One's file
synchronization and caching capabilities in your app.
.. note::
You may have to create the 'static/extra' directory before putting
files in there.
"""
#extra_path = resource_filename('gateone', 'static/extra')
extra_path = os.path.join(getsettings('BASE_DIR'), 'static/extra')
#if not resource_exists('gateone', '/static/extra'):
#return # Nothing to do
if not os.path.exists(extra_path):
return # Nothing to do
#for f in resource_listdir('gateone', '/static/extra'):
#filepath = resource_filename('gateone', '/static/extra/%s' % f)
#if filepath.endswith('.js'):
#self.send_js(filepath, force=True)
#elif filepath.endswith('.css'):
#self.send_css(filepath, force=True)
for f in os.listdir(extra_path):
filepath = os.path.join(extra_path,f)
if filepath.endswith('.js'):
self.send_js(filepath, force=True)
elif filepath.endswith('.css'):
self.send_css(filepath, force=True)
def enumerate_themes(self):
"""
Returns a JSON-encoded object containing the installed themes and text
color schemes.
"""
#themes = resource_listdir('gateone', '/templates/themes')
themes = os.listdir(os.path.join(getsettings('BASE_DIR'), 'templates/themes'))
# Just in case other junk wound up in that directory:
themes = [a for a in themes if a.endswith('.css')]
themes = [a.replace('.css', '') for a in themes] # Just need the names
message = {'go:themes_list': {'themes': themes}}
self.write_message(message)
def send_client_files(self):
"""
Sends the client our standard CSS and JS.
"""
# Render and send the client our terminal.css
terminal_css = resource_filename(
'applications', '/templates/terminal.css')
#terminal_css = os.path.join(getsettings('BASE_DIR'), 'static', 'templates', 'terminal.css')
self.render_and_send_css(terminal_css, element_id="terminal.css")
# Send the client our JavaScript files
js_files = resource_listdir('applications', '/static/')
#js_files = os.listdir(os.path.join(getsettings('BASE_DIR'), 'static', 'terminal'))
js_files.sort()
for fname in js_files:
if fname.endswith('.js'):
js_file_path = resource_filename(
'applications', '/static/%s' % fname)
#js_file_path = os.path.join(os.path.join(getsettings('BASE_DIR'), 'static', 'terminal'),fname)
if fname == 'terminal.js':
self.ws.send_js(js_file_path, requires=["terminal.css"])
elif fname == 'terminal_input.js':
self.ws.send_js(js_file_path, requires="terminal.js")
else:
self.ws.send_js(js_file_path, requires='terminal_input.js')
self.ws.send_plugin_static_files(
'go_terminal_plugins',
requires=["terminal_input.js"])
# Send the client the 256-color style information and our printing CSS
self.send_256_colors()
self.send_print_stylesheet()
def enumerate_fonts(self):
"""
Returns a JSON-encoded object containing the installed fonts.
"""
from applications.woff_info import woff_info
fonts = resource_listdir(
'applications', '/static/fonts')
#fonts = os.listdir(os.path.join(getsettings('BASE_DIR'), 'static/terminal/fonts'))
font_list = []
for font in fonts:
if not font.endswith('.woff'):
continue
font_path = resource_filename(
'applications', '/static/fonts/%s' % font)
#font_path = os.path.join(os.path.join(getsettings('BASE_DIR'), 'static/terminal/fonts'), font)
font_info = woff_info(font_path)
if "Font Family" not in font_info:
self.ws.logger.error(_(
"Bad font in fonts dir (missing Font Family in name "
"table): %s" % font))
continue # Bad font
if font_info["Font Family"] not in font_list:
font_list.append(font_info["Font Family"])
message = {'terminal:fonts_list': {'fonts': font_list}}
self.write_message(message)
#@require(policies('terminal'))
def enumerate_colors(self):
"""
Returns a JSON-encoded object containing the installed text color
schemes.
"""
colors = resource_listdir(
'applications', '/templates/term_colors')
#colors = os.listdir(os.path.join(getsettings('BASE_DIR'), 'static/templates/term_colors'))
colors = [a for a in colors if a.endswith('.css')]
colors = [a.replace('.css', '') for a in colors]
message = {'terminal:colors_list': {'colors': colors}}
self.write_message(message)
def get_genome_size_file(genome):
# type: (str) -> str
genome_names = pkg_resources.resource_listdir("epic", "scripts/chromsizes")
name_dict = {n.lower().replace(".chromsizes", ""): n for n in genome_names}
# No try/except here, because get_egs would already have failed if genome
# did not exist
genome_exact = name_dict[genome.lower()]
return pkg_resources.resource_filename(
"epic", "scripts/chromsizes/{}".format(genome_exact))
def get_effective_genome_length(genome, read_length):
# type: (str, int) -> float
genome_names = pkg_resources.resource_listdir("epic",
"scripts/effective_sizes")
name_dict = {n.split("_")[0]: "".join(n.split("_")[:-1])
for n in genome_names}
try:
genome_exact = name_dict[genome.lower()]
egf = pkg_resources.resource_string( # type: ignore
"epic", "scripts/effective_sizes/{}_{}.txt".format(
genome_exact, read_length)).split()[-1].decode()
except KeyError:
genome_list = "\n".join(list(name_dict.keys()))
logging.error(
"Genome " + genome +
" not found.\n These are the available genomes: " + genome_list +
"\nIf yours is not there, please request it at github.com/endrebak/epic .")
genome_length = sum(create_genome_size_dict(genome).values())
logging.info("Using an effective genome fraction of {}.".format(egf))
assert float(
egf) < 1, "Something wrong happened, effective genome fraction over 1!"
egs = float(egf) * genome_length
return egs
def _makeReference( output_path, type_suffix ):
recs = []
for resource in pkg.resource_listdir(_REF_DIR, ''):
if pkg.resource_isdir(_REF_DIR, resource ):
expected_file = "{0}_{1}.fasta".format(resource, type_suffix)
expected_path = op.join(_REF_PATH, resource, expected_file)
if op.exists( expected_path ):
recs += _readFasta( expected_path )
else:
raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource))
_writeFasta( output_path, recs )
return True
def makeExonReference():
data = {}
for resource in pkg.resource_listdir(_REF_DIR, ''):
if pkg.resource_isdir(_REF_DIR, resource ):
expected_file = "{0}_exons.map".format(resource)
expected_path = op.join(_REF_PATH, resource, expected_file)
if op.exists( expected_path ):
data[resource] = expected_path
elif _make_exon_map( expected_path, resource ):
data[resource] = expected_path
else:
raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource))
_writeMap( _EXON_REF, data )
return True
def _get_global_builders():
"""Find builders defined globally
"""
res = {}
for name in pkg_resources.resource_listdir('pkgpanda', 'docker/'):
res[name] = pkg_resources.resource_filename('pkgpanda',
'docker/' + name)
return res
def __load_all_items():
items = {}
for fname in pkg_resources.resource_listdir(__name__, 'data/items'):
fullname = os.path.join('data/items/', fname)
inpu = pkg_resources.resource_stream(__name__, fullname)
Logger.info("Loading items from {}", fullname)
items[fullname] = tuple(_load_items(inpu))
Logger.info("Loaded {} items", len(items[fullname]))
Logger.info("Total items: {}", sum(len(v) for v in items.values()))
return items
def recursive_copy(origin, destiny):
"""Copy directory from resource to destiny folder"""
if resource_isdir(__name__, origin):
if not exists(destiny):
os.makedirs(destiny)
for element in resource_listdir(__name__, origin):
origin_element = join(origin, element)
destiny_element = join(destiny, element)
recursive_copy(origin_element, destiny_element)
else:
with open(destiny, "wb") as fil:
fil.write(resource(origin))
def get_all_test_case_names():
tests = pkg_resources.resource_listdir("tbget", "tests")
return [i for i in tests if i.endswith(".txt") and
not i.endswith(".expected.txt")]
def get_languages_supported_by_all(cls, root_egg):
egg_interfaces = cls.get_all_relevant_interfaces(root_egg)
default_languages = ['en_gb']
if not egg_interfaces:
return default_languages
domains_in_use = [e.name for e in egg_interfaces]
languages_for_eggs = {}
for translation_entry_point in iter_entry_points('reahl.translations'):
requirement = translation_entry_point.dist.as_requirement()
egg_internal_path = cls.get_egg_internal_path_for(translation_entry_point)
if resource_isdir(requirement, egg_internal_path):
languages = [d for d in resource_listdir(requirement, egg_internal_path)
if (resource_isdir(requirement, '%s/%s' % (egg_internal_path, d)) and not d.startswith('__'))]
else:
logging.error('Translations of %s not found in %s' % (requirement, egg_internal_path))
languages = []
for language in languages:
language_path = '%s/%s/LC_MESSAGES' % (egg_internal_path, language)
domains = [d[:-3] for d in resource_listdir(requirement, language_path) if d.endswith('.mo')]
for domain in domains:
if domain in domains_in_use:
languages = languages_for_eggs.setdefault(domain, set())
languages.add(language)
if not languages_for_eggs.values():
return default_languages
languages = (list(languages_for_eggs.values()))[0].intersection(*languages_for_eggs.values())
languages.update(default_languages)
return list(languages)
def _install(package, src_dir, dst_dir, params, prefix_len=None, rec=None):
"""Interpolate source directory into target directory with params."""
package_name = package.__name__
contents = pkg_resources.resource_listdir(package_name, src_dir)
if prefix_len is None:
prefix_len = len(src_dir) + 1
for item in contents:
resource_path = '/'.join([src_dir, item])
dst_path = os.path.join(dst_dir, resource_path[prefix_len:])
if pkg_resources.resource_isdir(package_name,
'/'.join([src_dir, item])):
fs.mkdir_safe(dst_path)
if rec:
rec.write('%s/\n' % dst_path)
_install(package,
os.path.join(src_dir, item),
dst_dir,
params,
prefix_len=prefix_len,
rec=rec)
else:
if resource_path.endswith('.swp'):
continue
_LOGGER.info('Render: %s => %s', resource_path, dst_path)
resource_str = pkg_resources.resource_string(package_name,
resource_path)
if rec:
rec.write('%s\n' % dst_path)
_update(dst_path, _render(resource_str.decode('utf-8'), params))
def main(target_file, include_dir):
"""Console script for phriky_units"""
print pkg_resources.resource_listdir('phriky_units.resources', '')
with open('delete_me.txt', 'w') as fp:
fp.write(resource_string('phriky_units.resources.cppcheck', 'std.cfg'))
def get_script(version):
"""
Generate the script to get the database from *version* (the result of
:func:`detect_version`) to the current version of the software. If
*version* is ``None``, this is simply the contents of the
:file:`sql/create_piwheels.sql` script. Otherwise, it is a concatenation of
various update scripts.
"""
if version is None:
return resource_string(__name__, 'sql/create_piwheels.sql').decode('utf-8')
# Build the list of upgradable versions from the scripts in the sql/
# directory
upgrades = {}
ver_regex = re.compile(r'update_piwheels_(?P<from>.*)_to_(?P<to>.*)\.sql$')
for filename in resource_listdir(__name__, 'sql'):
match = ver_regex.match(filename)
if match is not None:
upgrades[match.group('from')] = (match.group('to'), filename)
# Attempt to find a list of scripts which'll get us from the existing
# version to the desired one. NOTE: This is a stupid algorithm which won't
# attempt different branches or back-tracking so if you wind up with custom
# versions or downgrade scripts in the sql directory, things will probably
# break
this_version = version
output = []
try:
while this_version != __version__:
this_version, filename = upgrades[this_version]
output.append(resource_string(__name__, 'sql/' + filename))
except KeyError:
raise RuntimeError("Unable to find upgrade path from %s to %s" % (
version, __version__))
return ''.join(script.decode('utf-8') for script in output)
def _get_settings(self):
if isdefined(self.inputs.settings):
NIWORKFLOWS_LOG.info('User-defined settings, overriding defaults')
return self.inputs.settings
filestart = '{}-mni_registration_{}_'.format(
self.inputs.moving.lower(), self.inputs.flavor)
filenames = [i for i in pkgr.resource_listdir('niworkflows', 'data')
if i.startswith(filestart) and i.endswith('.json')]
return [pkgr.resource_filename('niworkflows.data', f)
for f in sorted(filenames)]
def load_package(self, pkg):
migrations = []
for resource_name in pkg_resources.resource_listdir(pkg, "migrations"):
name, _ = os.path.splitext(resource_name)
m = MIGRATION_FILE_PATTERN.match(name)
if m:
migration = Migration(Version(int(m.group("version")), m.group("name")),
functools.partial(pkg_resources.resource_stream, pkg,
"migrations/" + resource_name))
migrations.append(migration)
self.migrations = sorted(migrations, key=lambda e: e.version.version)
def _find_templates() -> Dict[str, str]:
"""
Find all templates and return a map from short name to full name
"""
lookup = OrderedDict() # type: Dict[str, str]
templates = pkg_resources.resource_listdir("etl", "templates")
for filename in sorted(templates):
name = os.path.splitext(filename)[0]
lookup[name] = os.path.join("templates", filename)
return lookup
def parameters_from_yaml(name, input_key=None, expected_key=None):
package_name, resource_name = name.split('.', 1)
resources = []
if resource_isdir(package_name, resource_name):
resources.extend([resource_name + '/' + r
for r in resource_listdir(package_name, resource_name) if r.endswith(('.yml', '.yaml'))])
elif resource_exists(package_name, resource_name + '.yml'):
resources.append(resource_name + '.yml')
elif resource_exists(package_name, resource_name + '.yaml'):
resources.append(resource_name + '.yaml')
if not resources:
raise RuntimeError('Not able to load any yaml file for {0}'.format(name))
parameters = []
for resource_name in resources:
with resource_stream(package_name, resource_name) as stream:
data = yaml.load(stream, Loader=serializer.YAMLLoader)
if input_key and expected_key:
parameters.append((data[expected_key], data[input_key]))
continue
for root_key, root_value in data.items():
if isinstance(root_value, Mapping):
for expected, data_input in root_value.items():
for properties in data_input if isinstance(data_input, (tuple, list)) else [data_input]:
parameters.append((root_key, expected, properties))
else:
for properties in root_value if isinstance(root_value, (tuple, list)) else [root_value]:
parameters.append((root_key, properties))
return parameters