def get_parsed_defs(self):
if self._parsed_defs:
return self._parsed_defs
self._parsed_defs = yaml.safe_load(self.data)
return self._parsed_defs
python类safe_load()的实例源码
def __init__(self, *args, **kwargs):
self._defs = {}
super(Definition, self).__init__(*args, **kwargs)
self.data = pkg_resources.resource_string(__name__, "ceph.yaml")
self._parsed_defs = yaml.safe_load(self.data)
self.value = '_NS/integrations/ceph/definitions'
def get_parsed_defs(self):
if self._parsed_defs:
return self._parsed_defs
self._parsed_defs = yaml.safe_load(self.data)
return self._parsed_defs
def run_step(context):
"""Loads a yaml file into the pypyr context.
Yaml parsed from the file will be merged into the pypyr context. This will
overwrite existing values if the same keys are already in there.
I.e if file yaml has {'eggs' : 'boiled'} and context {'eggs': 'fried'}
already exists, returned context['eggs'] will be 'boiled'.
Args:
context: pypyr.context.Context. Mandatory.
The following context key must exist
- fetchYamlPath. path-like. Path to file on disk.
Returns:
None. updates context arg.
Raises:
FileNotFoundError: take a guess
pypyr.errors.KeyNotInContextError: fetchYamlPath missing in context.
pypyr.errors.KeyInContextHasNoValueError: fetchYamlPath exists but is
None.
"""
logger.debug("started")
context.assert_key_has_value(key='fetchYamlPath', caller=__name__)
file_path = context.get_formatted('fetchYamlPath')
logger.debug(f"attempting to open file: {file_path}")
with open(file_path) as yaml_file:
payload = yaml.safe_load(yaml_file)
if not isinstance(payload, MutableMapping):
raise TypeError("yaml input should describe a dictionary at the top "
"level. You should have something like "
"\n'key1: value1'\n key2: value2'\n"
"in the yaml top-level, not \n'- value1\n - value2'")
logger.debug("yaml file loaded. Merging into pypyr context. . .")
context.update(payload)
logger.info(f"yaml file merged into pypyr context. Count: {len(payload)}")
logger.debug("done")
def render_yaml_file(filename):
with open(os.path.join(HERE, "..", filename)) as stream:
content = yaml.safe_load(stream)
return web.json_response(content)
def check_yaml_resource(cli, url, filename):
with open(os.path.join(HERE, "..", "pollbot", filename)) as stream:
content = yaml.safe_load(stream)
resp = await cli.get(url)
assert await resp.json() == content
def test_oas_spec():
with open(os.path.join(HERE, "..", "pollbot", "api.yaml"), 'r') as stream:
oas_spec = yaml.safe_load(stream)
# example for swagger spec v2.0
validate_spec(oas_spec)
def read_config(path = 'aetros.yml', logger=None):
path = os.path.normpath(os.path.expanduser(path))
config = {
'dockerfile': None,
'command': None,
'install': None,
'ignore': None,
'image': None,
'server': None,
'parameters': {},
'servers': None,
'before_command': [],
}
if os.path.exists(path):
f = open(path, 'r')
custom_config = yaml.safe_load(f)
if custom_config is None:
custom_config = {}
if 'storage_dir' in custom_config:
del custom_config['storage_dir']
config.update(custom_config)
logger and logger.debug('Config loaded from ' + os.path.realpath(path))
if 'parameters' not in config:
config['parameters'] = {}
return config
def main(self, args):
import aetros.const
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
prog=aetros.const.__prog__ + ' run')
parser.add_argument('name', nargs='?', help="Model name")
parser.add_argument('--private', action='store_true', help="Make the model private. Example: aetros init my-model --private")
home_config = read_home_config()
parsed_args = parser.parse_args(args)
if not parsed_args.name:
parser.print_help()
sys.exit(1)
if os.path.exists('aetros.yml'):
config = yaml.safe_load(open('aetros.yml', 'r'))
if isinstance(config, dict) and 'model' in config:
print("failed: aetros.yml already exists with a linked model to " + config['model'])
sys.exit(1)
name = api.create_model(parsed_args.name or (os.path.basename(os.getcwd())), parsed_args.private)
with open('aetros.yml', 'w') as f:
f.write('model: ' + name)
print("aetros.yml created linked with model " + name + ' in ' + os.getcwd())
print("Open AETROS Trainer to see the model at https://" + home_config['host'] + '/model/' + name)
def load_config(self, key: str) -> dict:
try:
data = pkg_resources.resource_string(self.load_from, 'whither.yml').decode('utf-8')
data = self._filter_data(key, data)
config = yaml.safe_load(data)
config = config[key]
except Exception:
data = open(self.load_from, 'r').read()
data = self._filter_data(key, data)
config = yaml.safe_load(data)
config = config[key]
return {key: value for key, value in config.items()}
def load(path: str) -> Dict:
with open(path, 'rt') as file:
config = yaml_load(file.read())
validate(config)
return config
def setup_logging() -> None:
with open('log_config.yaml', 'r') as file:
config_dict = yaml_load(file.read())
logging.config.dictConfig(config_dict)
def __init__(self, *args, **kwargs):
self._defs = {}
super(Definition, self).__init__(*args, **kwargs)
self.data = pkg_resources.resource_string(
__name__,
"monitoring_integration.yaml"
)
self._parsed_defs = yaml.safe_load(self.data)
self.value = "_NS/monitoring/definitions"
def get_parsed_defs(self):
if self._parsed_defs:
return self._parsed_defs
self._parsed_defs = yaml.safe_load(self.data)
return self._parsed_defs
def __init__(self, *, loop=None, **options):
super().__init__(loop=loop, **options)
self.banned_ids = []
self.config = yaml.safe_load(open("config.yml", "r"))
self.data_manager = DataManager()
def load_yaml(self, template_file):
with open(template_file) as f:
try:
return yaml.safe_load(f)
except yaml.YAMLError as e:
print(e)
return []
def set_to_default(self):
""" Overwrite the configuration with the default configuration. """
self._config = yaml.safe_load(self.default())
def task_config():
'''
write config.yml -> .config.yml
'''
log_level = 'WARNING'
filename = '{0}/LOG_LEVEL'.format(os.path.dirname(__file__))
if os.path.isfile(filename):
log_level = open(filename).read().strip()
log_level = get_var('LOG_LEVEL', log_level)
if log_level not in LOG_LEVELS:
raise UnknownLogLevelError(log_level)
punch = fmt('''
logging:
loggers:
api:
level: {log_level}
handlers:
console:
level: {log_level}
''')
return {
'actions': [
fmt('echo "cp {CONFIG_YML}\n-> {DOT_CONFIG_YML}"'),
fmt('echo "setting LOG_LEVEL={log_level}"'),
fmt('cp {CONFIG_YML} {DOT_CONFIG_YML}'),
lambda: _update_config(DOT_CONFIG_YML, yaml.safe_load(punch)),
]
}
def _load_config(cfgs):
config = {}
for cfg in cfgs:
cfg = os.path.expanduser(cfg)
if os.path.isfile(cfg):
with open(cfg, 'r') as f:
yml = yaml.safe_load(f)
if yml:
config.update(yml)
return AttrDict(config)
def _load_config(filename=DOT_CONFIG_YML, roundtrip=False, fixup=True):
cfg = {}
if os.path.isfile(filename):
try:
with open(filename, 'r') as f:
if roundtrip:
cfg = yaml.round_trip_load(f.read())
else:
cfg = yaml.safe_load(f.read())
if fixup:
cfg = _fixup(cfg)
except Exception as ex:
print('ex =', ex)
raise ConfigLoadError(filename, errors=[ex])
return AttrDict(cfg)