def _get_translation_directories(self, enabled_plugins):
main_translation_directory = 'translations'
result = [main_translation_directory]
entry_points = (e for e in iter_entry_points(group='wazo_admin_ui.plugins')
if e.name in enabled_plugins)
for ep in entry_points:
if resource_isdir(ep.module_name, TRANSLATION_DIRECTORY):
result.append(resource_filename(ep.module_name, TRANSLATION_DIRECTORY))
return result
python类resource_isdir()的实例源码
def resource_walk(package_or_requirement, resource_name):
"""Generate the file names in a resource tree.
Parameters
----------
package_or_requirement : str or Requirement
Package or requirement that contains the resource.
resource_name : str
Name of the resource.
Returns
-------
tuple
For each directory in the tree rooted at the given resoruce a 3-tuple
``(dirpath, dirnames, filenames)`` is returned. *dirpath* is a string,
the path to the directory starting with *resource_name*. *dirnames* is
a list of the names of subdirectories in *dirpath*. *filenames* is a
list of names of non-directory files in *dirpath*.
"""
queue = [resource_name]
while len(queue) > 0:
dirpath = queue.pop()
dirnames = []
filenames = []
for name in resource_listdir(package_or_requirement, dirpath):
fullpath = os.path.join(dirpath, name)
if resource_isdir(package_or_requirement, fullpath):
dirnames.append(name)
queue.append(fullpath)
else:
filenames.append(name)
yield dirpath, dirnames, filenames
def _makeReference( output_path, type_suffix ):
recs = []
for resource in pkg.resource_listdir(_REF_DIR, ''):
if pkg.resource_isdir(_REF_DIR, resource ):
expected_file = "{0}_{1}.fasta".format(resource, type_suffix)
expected_path = op.join(_REF_PATH, resource, expected_file)
if op.exists( expected_path ):
recs += _readFasta( expected_path )
else:
raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource))
_writeFasta( output_path, recs )
return True
def makeExonReference():
data = {}
for resource in pkg.resource_listdir(_REF_DIR, ''):
if pkg.resource_isdir(_REF_DIR, resource ):
expected_file = "{0}_exons.map".format(resource)
expected_path = op.join(_REF_PATH, resource, expected_file)
if op.exists( expected_path ):
data[resource] = expected_path
elif _make_exon_map( expected_path, resource ):
data[resource] = expected_path
else:
raise MissingReferenceException('Missing expected reference file "{0}" for Locus "{1}"'.format(expected_file, resource))
_writeMap( _EXON_REF, data )
return True
def recursive_copy(origin, destiny):
"""Copy directory from resource to destiny folder"""
if resource_isdir(__name__, origin):
if not exists(destiny):
os.makedirs(destiny)
for element in resource_listdir(__name__, origin):
origin_element = join(origin, element)
destiny_element = join(destiny, element)
recursive_copy(origin_element, destiny_element)
else:
with open(destiny, "wb") as fil:
fil.write(resource(origin))
def get_languages_supported_by_all(cls, root_egg):
egg_interfaces = cls.get_all_relevant_interfaces(root_egg)
default_languages = ['en_gb']
if not egg_interfaces:
return default_languages
domains_in_use = [e.name for e in egg_interfaces]
languages_for_eggs = {}
for translation_entry_point in iter_entry_points('reahl.translations'):
requirement = translation_entry_point.dist.as_requirement()
egg_internal_path = cls.get_egg_internal_path_for(translation_entry_point)
if resource_isdir(requirement, egg_internal_path):
languages = [d for d in resource_listdir(requirement, egg_internal_path)
if (resource_isdir(requirement, '%s/%s' % (egg_internal_path, d)) and not d.startswith('__'))]
else:
logging.error('Translations of %s not found in %s' % (requirement, egg_internal_path))
languages = []
for language in languages:
language_path = '%s/%s/LC_MESSAGES' % (egg_internal_path, language)
domains = [d[:-3] for d in resource_listdir(requirement, language_path) if d.endswith('.mo')]
for domain in domains:
if domain in domains_in_use:
languages = languages_for_eggs.setdefault(domain, set())
languages.add(language)
if not languages_for_eggs.values():
return default_languages
languages = (list(languages_for_eggs.values()))[0].intersection(*languages_for_eggs.values())
languages.update(default_languages)
return list(languages)
def _install(package, src_dir, dst_dir, params, prefix_len=None, rec=None):
"""Interpolate source directory into target directory with params."""
package_name = package.__name__
contents = pkg_resources.resource_listdir(package_name, src_dir)
if prefix_len is None:
prefix_len = len(src_dir) + 1
for item in contents:
resource_path = '/'.join([src_dir, item])
dst_path = os.path.join(dst_dir, resource_path[prefix_len:])
if pkg_resources.resource_isdir(package_name,
'/'.join([src_dir, item])):
fs.mkdir_safe(dst_path)
if rec:
rec.write('%s/\n' % dst_path)
_install(package,
os.path.join(src_dir, item),
dst_dir,
params,
prefix_len=prefix_len,
rec=rec)
else:
if resource_path.endswith('.swp'):
continue
_LOGGER.info('Render: %s => %s', resource_path, dst_path)
resource_str = pkg_resources.resource_string(package_name,
resource_path)
if rec:
rec.write('%s\n' % dst_path)
_update(dst_path, _render(resource_str.decode('utf-8'), params))
def parameters_from_yaml(name, input_key=None, expected_key=None):
package_name, resource_name = name.split('.', 1)
resources = []
if resource_isdir(package_name, resource_name):
resources.extend([resource_name + '/' + r
for r in resource_listdir(package_name, resource_name) if r.endswith(('.yml', '.yaml'))])
elif resource_exists(package_name, resource_name + '.yml'):
resources.append(resource_name + '.yml')
elif resource_exists(package_name, resource_name + '.yaml'):
resources.append(resource_name + '.yaml')
if not resources:
raise RuntimeError('Not able to load any yaml file for {0}'.format(name))
parameters = []
for resource_name in resources:
with resource_stream(package_name, resource_name) as stream:
data = yaml.load(stream, Loader=serializer.YAMLLoader)
if input_key and expected_key:
parameters.append((data[expected_key], data[input_key]))
continue
for root_key, root_value in data.items():
if isinstance(root_value, Mapping):
for expected, data_input in root_value.items():
for properties in data_input if isinstance(data_input, (tuple, list)) else [data_input]:
parameters.append((root_key, expected, properties))
else:
for properties in root_value if isinstance(root_value, (tuple, list)) else [root_value]:
parameters.append((root_key, properties))
return parameters
def copy_resource_dir(src, dest):
"""
To copy package data directory to destination
"""
package_name = "mocha"
dest = (dest + "/" + os.path.basename(src)).rstrip("/")
if pkg_resources.resource_isdir(package_name, src):
if not os.path.isdir(dest):
os.makedirs(dest)
for res in pkg_resources.resource_listdir(__name__, src):
copy_resource_dir(src + "/" + res, dest)
else:
if not os.path.isfile(dest) and os.path.splitext(src)[1] not in [".pyc"]:
copy_resource_file(src, dest)
def copy_dir(source, dest, variables, out_=sys.stdout, i=0):
"""
Copies the ``source`` directory to the ``dest`` directory, where
``source`` is some tuple representing an installed package and a
subdirectory in the package, e.g.,
('pecan', os.path.join('scaffolds', 'base'))
('pecan_extension', os.path.join('scaffolds', 'scaffold_name'))
``variables``: A dictionary of variables to use in any substitutions.
Substitution is performed via ``string.Template``.
``out_``: File object to write to (default is sys.stdout).
"""
def out(msg):
out_.write('%s%s' % (' ' * (i * 2), msg))
out_.write('\n')
out_.flush()
names = sorted(pkg_resources.resource_listdir(source[0], source[1]))
if not os.path.exists(dest):
out('Creating %s' % dest)
makedirs(dest)
else:
out('%s already exists' % dest)
return
for name in names:
full = '/'.join([source[1], name])
dest_full = os.path.join(dest, substitute_filename(name, variables))
sub_file = False
if dest_full.endswith('_tmpl'):
dest_full = dest_full[:-5]
sub_file = True
if pkg_resources.resource_isdir(source[0], full):
out('Recursing into %s' % os.path.basename(full))
copy_dir((source[0], full), dest_full, variables, out_, i + 1)
continue
else:
content = pkg_resources.resource_string(source[0], full)
if sub_file:
content = render_template(content, variables)
if content is None:
continue # pragma: no cover
out('Copying %s to %s' % (full, dest_full))
f = open(dest_full, 'wb')
f.write(content)
f.close()