def load_config_spec(config_spec, config_sections, repl_vars, language):
config_split = config_spec.strip().split(':')
config_path = config_split[0]
if len(config_split) > 1:
config_sections = config_split[1].split('|')
with open(config_path) as config_file:
all_config_data = yaml.load(config_file, Loader=yaml.Loader)
# Make a list of the appropriate configuration sections (just the ones
# we are actually using) from the YAML file.
segments = [all_config_data[i] for i in config_sections]
segments.append(all_config_data.get(language, {}))
# Merge all of the segments of data into a single config dictionary.
config = merge(*segments)
# Perform final replacements.
return replace_vars(config, repl_vars)
python类load()的实例源码
def generate_cwl_documentation(_):
cur_dir = os.path.abspath(os.path.dirname(__file__))
# find all cwl files
with WorkflowGenerator() as wf:
cwl_files = [step.run for step in wf.steps_library.steps.values()]
# sort alphabetically
cwl_files.sort()
tools_file = os.path.join(cur_dir, 'tools.rst')
tool_template = '\n{}\n{}\n\n{}\n'
with codecs.open(tools_file, 'wb', encoding='utf-8') as f:
f.write('Tools\n=====\n')
f.write('\n``nlppln`` contains the following tools:\n')
for cwl in cwl_files:
tool_name = os.path.basename(cwl)
plusses = '+'*len(tool_name)
with codecs.open(cwl) as c:
try:
cwl_yaml = yaml.load(c, Loader=yaml.RoundTripLoader)
doc = cwl_yaml.get('doc', 'No documentation')
f.write(tool_template.format(tool_name, plusses, doc))
except yaml.YAMLError:
pass
def from_yaml():
""" Load configuration from yaml source(s), cached to only run once """
default_yaml_str = snippets.get_snippet_content('hatchery.yml')
ret = yaml.load(default_yaml_str, Loader=yaml.RoundTripLoader)
for config_path in CONFIG_LOCATIONS:
config_path = os.path.expanduser(config_path)
if os.path.isfile(config_path):
with open(config_path) as config_file:
config_dict = yaml.load(config_file, Loader=yaml.RoundTripLoader)
if config_dict is None:
continue
for k, v in config_dict.items():
if k not in ret.keys():
raise ConfigError(
'found garbage key "{}" in {}'.format(k, config_path)
)
ret[k] = v
return ret
def from_parmed(cls, path, *args, **kwargs):
"""
Try to load a file automatically with ParmEd. Not guaranteed to work, but
might be useful if it succeeds.
Arguments
---------
path : str
Path to file that ParmEd can load
"""
st = parmed.load_file(path, structure=True, *args, **kwargs)
box = kwargs.pop('box', getattr(st, 'box', None))
velocities = kwargs.pop('velocities', getattr(st, 'velocities', None))
positions = kwargs.pop('positions', getattr(st, 'positions', None))
return cls(master=st, topology=st.topology, positions=positions, box=box,
velocities=velocities, path=path, **kwargs)
def _pickle_load(path):
"""
Loads pickled topology. Careful with Python versions though!
"""
_, ext = os.path.splitext(path)
topology = None
if sys.version_info.major == 2:
if ext == '.pickle2':
with open(path, 'rb') as f:
topology = pickle.load(f)
elif ext in ('.pickle3', '.pickle'):
with open(path, 'rb') as f:
topology = pickle.load(f, protocol=3)
elif sys.version_info.major == 3:
if ext == '.pickle2':
with open(path, 'rb') as f:
topology = pickle.load(f)
elif ext in ('.pickle3', '.pickle'):
with open(path, 'rb') as f:
topology = pickle.load(f)
if topology is None:
raise ValueError('File {} is not compatible with this version'.format(path))
return topology
def from_json(cls, json_string=None, filename=None,
encoding="utf-8", errors="strict", **kwargs):
"""
Transform a json object string into a Box object. If the incoming
json is a list, you must use BoxList.from_json.
:param json_string: string to pass to `json.loads`
:param filename: filename to open and pass to `json.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()` or `json.loads`
:return: Box object from json data
"""
bx_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
bx_args[arg] = kwargs.pop(arg)
data = _from_json(json_string, filename=filename,
encoding=encoding, errors=errors, **kwargs)
if not isinstance(data, dict):
raise BoxError('json data not returned as a dictionary, '
'but rather a {0}'.format(type(data).__name__))
return cls(data, **bx_args)
def from_yaml(cls, yaml_string=None, filename=None,
encoding="utf-8", errors="strict",
**kwargs):
"""
Transform a yaml object string into a Box object.
:param yaml_string: string to pass to `yaml.load`
:param filename: filename to open and pass to `yaml.load`
:param encoding: File encoding
:param errors: How to handle encoding errors
:param kwargs: parameters to pass to `Box()` or `yaml.load`
:return: Box object from yaml data
"""
bx_args = {}
for arg in kwargs.copy():
if arg in BOX_PARAMETERS:
bx_args[arg] = kwargs.pop(arg)
data = _from_yaml(yaml_string=yaml_string, filename=filename,
encoding=encoding, errors=errors, **kwargs)
if not isinstance(data, dict):
raise BoxError('yaml data not returned as a dictionary'
'but rather a {0}'.format(type(data).__name__))
return cls(data, **bx_args)
def main():
parser = setup_parser()
options = parser.parse_args()
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)
logging.getLogger('botocore').setLevel(logging.WARNING)
with open(options.config) as fh:
config = yaml.load(fh.read(), Loader=yaml.SafeLoader)
jsonschema.validate(config, CONFIG_SCHEMA)
setup_defaults(config)
tester = MailerTester(
options.MESSAGE_FILE, config, msg_plain=options.plain,
json_dump_file=options.json_dump_file
)
tester.run(options.dry_run, options.print_only)
def _ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
"""
Ordered yaml loader
Use this instead ot yaml.loader/yaml.saveloader to get an Ordereddict
:param stream: stream to read from
:param Loader: yaml-loader to use
:object_pairs_hook: ...
:return: OrderedDict structure
"""
# usage example: ordered_load(stream, yaml.SafeLoader)
class OrderedLoader(Loader):
pass
def construct_mapping(loader, node):
loader.flatten_mapping(node)
return object_pairs_hook(loader.construct_pairs(node))
OrderedLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
construct_mapping)
return yaml.load(stream, OrderedLoader)
def yaml_load_roundtrip(filename):
"""
Load contents of a yaml file into an dict structure for editing (using Roundtrip Loader)
:param filename: name of the yaml file to load
:return: data structure loaded from file
"""
if not EDITING_ENABLED:
return None
y = None
try:
with open(filename+YAML_FILE, 'r') as stream:
sdata = stream.read()
sdata = sdata.replace('\n', '\n\n')
y = yaml.load(sdata, yaml.RoundTripLoader)
except Exception as e:
logger.error("yaml_load_roundtrip: YAML-file load error: '%s'" % (e))
y = {}
return y
def writeBackToFile(filename, itempath, itemattr, value):
"""
write the value of an item's attribute back to the yaml-file
:param filename: name of the yaml-file (without the .yaml extension!)
:param itempath: path of the item to modify
:param itemattr: name of the item's attribute to modify
:param value: new value for the attribute
:return: formatted string
"""
itemyamlfile = yamlfile(filename)
if os.path.isfile(filename+YAML_FILE):
itemyamlfile.load()
itemyamlfile.setleafvalue(itempath, itemattr, value)
itemyamlfile.save()
# ==================================================================================
# class yamlfile (for editing multiple entries at a time)
#
def yaml_save(filename, data):
"""
***Converter Special ***
Save contents of an OrderedDict structure to a yaml file
:param filename: name of the yaml file to save to
:param data: OrderedDict to save
"""
sdata = convert_yaml(data)
print(", saving to '{}'".format(os.path.basename(filename)+'.yaml'))
if store_raw_output == True:
with open(filename+'_raw.yaml', 'w') as outfile:
outfile.write( sdata )
# Test if roundtrip gives the same result
data = yaml.load(sdata, yaml.RoundTripLoader)
_yaml_save_roundtrip(filename, data)
def load_settings(default_settings_file, override_settings_files):
yaml = ruamel.yaml.YAML()
yaml.allow_duplicate_keys = False
try:
log("Loading common default settings from: " + DEFAULT_COMMON_SETTINGS_FILE)
settings = dict(yaml.load(open(DEFAULT_COMMON_SETTINGS_FILE)))
log("Loading default settings from: " + default_settings_file)
settings.update(yaml.load(open(default_settings_file)))
for settings_fpath in override_settings_files:
log("Loading settings from: " + settings_fpath)
override_settings = yaml.load(open(settings_fpath))
settings.update(override_settings)
log("Loaded settings.")
except ruamel.yaml.constructor.DuplicateKeyError as ex:
log(red(ex))
log(red("Aborting!"))
exit(1)
return settings
def import_cwl(self, cwl_path):
"""
Load content of cwl into the :class:`cwlgen.CommandLineTool` object.
:param cwl_path: Path of the CWL tool to be loaded.
:type cwl_path: STRING
:return: CWL tool content in cwlgen model.
:rtype: :class:`cwlgen.CommandLineTool`
"""
with open(cwl_path) as yaml_file:
cwl_dict = ryaml.load(yaml_file, Loader=ryaml.Loader)
tool = self._init_tool(cwl_dict)
for key, element in cwl_dict.items():
try:
getattr(self, '_load_{}'.format(key))(tool, element)
except AttributeError:
logger.warning(key + " content is not processed (yet).")
return tool
def main(*args):
"""Main method of artman."""
# If no arguments are sent, we are using the entry point; derive
# them from sys.argv.
if not args:
args = sys.argv[1:]
# Get to a normalized set of arguments.
flags = parse_args(*args)
user_config = read_user_config(flags)
_adjust_root_dir(flags.root_dir)
pipeline_name, pipeline_kwargs = normalize_flags(flags, user_config)
if flags.local:
try:
pipeline = pipeline_factory.make_pipeline(pipeline_name, False,
**pipeline_kwargs)
# Hardcoded to run pipeline in serial engine, though not necessarily.
engine = engines.load(
pipeline.flow, engine='serial', store=pipeline.kwargs)
engine.run()
except:
logger.fatal(traceback.format_exc())
finally:
_change_owner(flags, pipeline_name, pipeline_kwargs)
else:
support.check_docker_requirements(flags.image)
# Note: artman currently won't work if input directory doesn't contain
# shared configuration files (e.g. gapic/packaging/dependencies.yaml).
# This will make artman less useful for non-Google APIs.
# TODO(ethanbao): Fix that by checking the input directory and
# pulling the shared configuration files if necessary.
logger.info('Running artman command in a Docker instance.')
_run_artman_in_docker(flags)
def read_user_config(flags):
"""Read the user config from disk and return it.
Args:
flags (argparse.Namespace): The flags from sys.argv.
Returns:
dict: The user config.
"""
# Load the user configuration if it exists and save a dictionary.
user_config = {}
user_config_file = os.path.realpath(os.path.expanduser(flags.user_config))
if os.path.isfile(user_config_file):
with io.open(user_config_file) as ucf:
user_config = yaml.load(ucf.read(), Loader=yaml.Loader) or {}
# Sanity check: Is there a configuration? If not, abort.
if not user_config:
setup_logging(INFO)
logger.critical('No user configuration found.')
logger.warn('This is probably your first time running Artman.')
logger.warn('Run `configure-artman` to get yourself set up.')
sys.exit(64)
# Done; return the user config.
return user_config
def execute(self, gapic_code_dir, grpc_code_dir, proto_code_dir, gapic_api_yaml):
with open(gapic_api_yaml[0]) as f:
gapic_config = yaml.load(f, Loader=yaml.Loader)
package_name = gapic_config.get('language_settings').get('csharp').get('package_name')
package_root = '{0}/{1}'.format(gapic_code_dir, package_name)
prod_dir = '{0}/{1}'.format(package_root, package_name)
# Copy proto/grpc .cs files into prod directory
self.exec_command(['sh', '-c', 'cp {0}/*.cs {1}'.format(proto_code_dir, prod_dir)])
self.exec_command(['sh', '-c', 'cp {0}/*.cs {1}'.format(grpc_code_dir, prod_dir)])
def execute(self, src_proto_path, import_proto_path, common_protos_yaml,
organization_name):
self._organization_name = organization_name
with io.open(common_protos_yaml) as file_:
common_protos_data = yaml.load(file_, Loader=yaml.Loader)
# Treat google.protobuf, google.iam as a common proto package, even
# though they are not included in the common-protos we generate.
#
# TODO (geigerj): remove 'google.iam' when it is included in the common
# protos package.
common_protos = ['google.protobuf', 'google.iam']
for package in common_protos_data['packages']:
common_protos.append('google.' + package['name'].replace('/', '.'))
tmpdir = os.path.join(
tempfile.gettempdir(), 'artman-python', str(int(time.time())))
new_proto_dir = os.path.join(tmpdir, 'proto')
new_src_path = set()
new_import_path = [new_proto_dir]
self._copy_and_transform_directories(
src_proto_path, new_proto_dir, common_protos, paths=new_src_path)
self._copy_and_transform_directories(
import_proto_path, new_proto_dir, common_protos)
# Update src_proto_path, import_proto_path
return list(new_src_path), new_import_path
def read_header(path):
path = Path(path)
data = yload(path.text(encoding='utf8'))
return data['header']
def _parse_data(self):
with open(self.filename, 'r') as fin:
data = json.load(fin)
return data
def __init__(self, data=None, converts_none_to_str=True):
"""Could be a JSON or a YAML file
:param str filename: filename to a config file in json or YAML format.
SEQUANA config files must have some specific fields::
input_directory
input_samples...
"""
# Create a dummy YAML code to hold data in case the input is a json
# or a dictionary structure. We use a CommentedMap that works like
# a dictionary. Be aware that the update method will lose the comments
if data is None:
self.config = AttrDict()
self._yaml_code = comments.CommentedMap()
elif isinstance(data, str): # else is it a filename ?
if os.path.exists(data):
if data.endswith(".yaml") or data.endswith(".yml"):
with open(data, "r") as fh:
self._yaml_code = ruamel.yaml.load(
fh.read(), ruamel.yaml.RoundTripLoader)
else:
# read a JSON
import yaml
with open(data, "r") as fh:
self._yaml_code = yaml.load(json.dumps(
json.loads(fh.read())))
config = load_configfile(data)
else:
raise IOError("input string must be an existing file (%s)" % data)
self.config = AttrDict(**config)
elif isinstance(data, SequanaConfig): # else maybe a SequanaConfig ?
self.config = AttrDict(**data.config)
self._yaml_code = comments.CommentedMap(self.config.copy())
else: # or a pure dictionary ?
self.config = AttrDict(**data)
self._yaml_code = comments.CommentedMap(self.config.copy())
self.cleanup_config()
def add_stats_summary_json(json_list, parser):
if not parser.stats:
return
for jfile in json_list:
with open(jfile, 'r') as fp:
jdict = json.load(fp)
jdict['stats'] = parser.stats
j = json.dumps(jdict)
with open(jfile, 'w') as fp:
print(j, file=fp)
def load_config(config_path, loader=yaml.Loader, verify_version=True):
if not os.path.exists(config_path):
system_log.error(_("config.yml not found in {config_path}").format(config_path))
return False
with codecs.open(config_path, encoding="utf-8") as stream:
config = yaml.load(stream, loader)
if verify_version:
config = config_version_verify(config, config_path)
return config
def load_config(config_path, loader=yaml.Loader):
if config_path is None:
return {}
if not os.path.exists(config_path):
system_log.error(_(u"config.yml not found in {config_path}").format(config_path))
return False
with codecs.open(config_path, encoding="utf-8") as stream:
config = yaml.load(stream, loader)
return config
def __init__(self, filename):
self.filename = filename
with open(filename, 'r') as f:
self.data = yaml.load(f, Loader=yaml.RoundTripLoader)
def load_meas_file(filename=None):
global LogDir, KernelDir, AWGDir, meas_file
if filename:
meas_file = filename
else:
meas_file = find_meas_file()
with open(meas_file, 'r') as fid:
Loader.add_constructor('!include', Loader.include)
load = Loader(fid)
code = load.get_single_data()
load.dispose()
# Get the config values out of the measure_file.
if not 'config' in code.keys():
raise KeyError("Could not find config section of the yaml file.")
if 'AWGDir' in code['config'].keys():
AWGDir = os.path.abspath(code['config']['AWGDir'])
else:
raise KeyError("Could not find AWGDir in the YAML config section")
if 'KernelDir' in code['config'].keys():
KernelDir = os.path.abspath(code['config']['KernelDir'])
else:
raise KeyError("Could not find KernelDir in the YAML config section")
if 'LogDir' in code['config'].keys():
LogDir = os.path.abspath(code['config']['LogDir'])
else:
raise KeyError("Could not find LogDir in the YAML config section")
# Create directories if necessary
for d in [KernelDir, LogDir]:
if not os.path.isdir(d):
os.mkdir(d)
return code
def _init():
parallelism = config.get('parallelism', 1)
logger.info('Using parallelism: %d', parallelism)
for sandbox in await create_sandboxes(parallelism):
_sandbox_pool.put_nowait(sandbox)
try:
with open(_LANGS_FILE) as file:
langs_config = yaml.load(file, Loader=yaml.RoundTripLoader)
except FileNotFoundError:
logger.error('Language file %s not found.', _LANGS_FILE)
exit(1)
for lang_name, lang_config in langs_config.items():
if lang_config['type'] == 'compiler':
compiler = Compiler(lang_config['compiler_file'],
shlex.split(lang_config['compiler_args']),
lang_config['code_file'],
lang_config['execute_file'],
shlex.split(lang_config['execute_args']))
_langs[lang_name] = partial(
_compiler_build, compiler,
time_limit_ns=lang_config.get('time_limit_ms', DEFAULT_TIME_MS) * 1000000,
memory_limit_bytes=lang_config.get('memory_limit_kb', DEFAULT_MEM_KB) * 1024,
process_limit=lang_config.get('process_limit', PROCESS_LIMIT))
elif lang_config['type'] == 'interpreter':
interpreter = Interpreter(lang_config['code_file'],
lang_config['execute_file'],
shlex.split(lang_config['execute_args']))
_langs[lang_name] = partial(_interpreter_build, interpreter)
else:
logger.error('Unknown type %s', lang_config['type'])
def _load_config():
try:
with open(_CONFIG_FILE, encoding='utf-8') as file:
return yaml.load(file, Loader=yaml.RoundTripLoader)
except FileNotFoundError:
logger.error('Config file %s not found.', _CONFIG_FILE)
exit(1)
def load(f):
return ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader)
def load(stream, Loader=yaml.SafeLoader, object_pairs_hook=OrderedDict):
class OrderedLoader(Loader):
pass
def construct_mapping(loader, node):
loader.flatten_mapping(node)
return object_pairs_hook(loader.construct_pairs(node))
OrderedLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
construct_mapping)
return yaml.load(stream, OrderedLoader)