def find_by_id(self, id_):
"""Return ServiceDef object of the given service.
:param str id_: uri of the service definition
"""
_, name, version = id_.rsplit('/', 2)
rel_fname, abs_fname = self.get_fnames(name, version)
if self.ss_dir.isfile(rel_fname):
return ServiceDef.create_from_file(abs_fname)
resp = self.connection.request(method='GET', path=id_)
# Write the yaml file
with open(abs_fname, 'w+') as f:
yaml.safe_dump(resp.json(), f)
return ServiceDef.create_from_file(abs_fname)
python类safe_dump()的实例源码
def format_output(output, format_):
if format_ == 'plain':
if output is None:
return ''
if isinstance(output, text_type):
if text_type is str:
return output
else:
return output.encode('utf-8')
format_ = 'json'
# numbers, booleans, lists and dicts will be represented as JSON
if format_ == 'json':
return json.dumps(output)
if format_ == 'yaml':
# Usage of safe_dump here is crucial since PyYAML emits
# "!!python/unicode" objects from unicode strings by defaul
return yaml.safe_dump(output, default_flow_style=False)
raise RuntimeError("Unknown format '{}'".format(format_))
def update_release(channel):
"""
Update release manifests.
"""
if not os.path.exists("cluster.yml"):
error("no cluster.yml found. Did you configure?")
with open("cluster.yml") as fp:
config = yaml.load(fp.read())
if channel is None:
channel = config["release"]["channel"]
current_version = config["release"]["version"]
configure.release(config, channel)
if current_version == config["release"]["version"]:
click.echo("No updates available for {} channel".format(channel))
sys.exit(0)
with open("cluster.yml", "w") as fp:
fp.write(yaml.safe_dump(config, default_flow_style=False))
click.echo("Updated config to {} in {} channel".format(config["release"]["version"], config["release"]["channel"]))
def save(self):
"""Persist the services to disk"""
try:
logger.debug("Backing up services")
if os.path.exists(self._db):
shutil.copyfile(self._db, "{}.bak".format(self._db))
except Exception as e:
logger.exception("Failed to backup services")
return
try:
logger.debug("Saving services")
with open(self._db, 'w') as f:
f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._services.iteritems(
) if k not in self.reserved_services)), default_flow_style=False, explicit_start=True))
except Exception as e:
logger.exception("Failed to save services")
def save(self):
"""Persist the addressbook to disk"""
try:
logger.debug("Backing up addressbook")
if os.path.exists(self._db):
shutil.copyfile(self._db, "{}.bak".format(self._db))
except Exception as e:
logger.exception("Failed to backup addressbook")
return
try:
logger.debug("Saving addressbook")
with open(self._db, 'w') as f:
f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._addressbook.iteritems(
) if k not in self.reserved_addresses)), default_flow_style=False, explicit_start=True))
except Exception as e:
logger.exception("Failed to save addressbook")
def save(self):
"""Persist the checks to disk"""
try:
logger.debug("Backing up checks")
if os.path.exists(self._db):
shutil.copyfile(self._db, "{}.bak".format(self._db))
except Exception as e:
logger.exception("Failed to backup checks")
return
try:
logger.debug("Saving checks")
with open(self._db, 'w') as f:
f.write(yaml.safe_dump(self._checks,
default_flow_style=False, explicit_start=True))
except:
logger.exception("Failed to save checks")
def save(self):
"""Persist the chains to disk"""
try:
logger.debug("Backing up chains")
if os.path.exists(self._db):
shutil.copyfile(self._db, "{}.bak".format(self._db))
except Exception as e:
logger.exception("Failed to backup chains")
return
try:
logger.debug("Saving chains")
with open(self._db, 'w') as f:
f.write(yaml.safe_dump(self._tables,
default_flow_style=False, explicit_start=True))
except:
logger.exception("Failed to save chains")
def save(self):
"""Persist the interfaces to disk"""
try:
logger.debug("Backing up interfaces")
if os.path.exists(self._db):
shutil.copyfile(self._db, "{}.bak".format(self._db))
except Exception as e:
logger.exception("Failed to backup interfaces")
return
try:
logger.debug("Saving interfaces")
with open(self._db, 'w') as f:
f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._interfaces.iteritems(
) if k not in self.reserved_interfaces)), default_flow_style=False, explicit_start=True))
except Exception as e:
logger.exception("Failed to save interfaces")
def _update_system_file(system_file, name, new_kvs):
"""Update the bcbio_system.yaml file with new resource information.
"""
if os.path.exists(system_file):
bak_file = system_file + ".bak%s" % datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
shutil.copyfile(system_file, bak_file)
with open(system_file) as in_handle:
config = yaml.load(in_handle)
else:
utils.safe_makedir(os.path.dirname(system_file))
config = {}
new_rs = {}
added = False
for rname, r_kvs in config.get("resources", {}).items():
if rname == name:
for k, v in new_kvs.items():
r_kvs[k] = v
added = True
new_rs[rname] = r_kvs
if not added:
new_rs[name] = new_kvs
config["resources"] = new_rs
with open(system_file, "w") as out_handle:
yaml.safe_dump(config, out_handle, default_flow_style=False, allow_unicode=False)
def _setUp(self):
super(RealPolicyFixture, self)._setUp()
self.policy_dir = self.useFixture(fixtures.TempDir())
self.policy_file = os.path.join(self.policy_dir.path,
'policy.yaml')
# Load the fake_policy data and add the missing default rules.
policy_rules = yaml.safe_load(fake_policy.policy_data)
self.add_missing_default_rules(policy_rules)
with open(self.policy_file, 'w') as f:
yaml.safe_dump(policy_rules, f)
policy_opts.set_defaults(CONF)
self.useFixture(
ConfPatcher(policy_dirs=[], policy_file=self.policy_file,
group='oslo_policy'))
deckhand.policy.reset()
deckhand.policy.init()
self.addCleanup(deckhand.policy.reset)
if self.verify:
self._install_policy_verification_hook()
def process_response(self, req, resp, resource):
"""Converts responses to ``application/x-yaml`` content type."""
if resp.status != '204 No Content':
resp.set_header('Content-Type', 'application/x-yaml')
for attr in ('body', 'data'):
if not hasattr(resp, attr):
continue
resp_attr = getattr(resp, attr)
try:
resp_attr = json.loads(resp_attr)
except (TypeError, ValueError):
pass
if isinstance(resp_attr, dict):
setattr(resp, attr, yaml.safe_dump(resp_attr))
elif isinstance(resp_attr, (list, tuple)):
setattr(resp, attr, yaml.safe_dump_all(resp_attr))
def json_to_yaml(filename):
'''
This function convert json file to yaml file
:param filename: filename
:type filename: str
:return: None
'''
try:
file=open(filename+".json","r")
json_data=json.loads(file.read())
yaml_file = open(filename + ".yaml", "w")
yaml.safe_dump(json_data,yaml_file,default_flow_style=False)
file.close()
yaml_file.close()
except FileNotFoundError:
print("[Error] Bad Input File")
test_heat_config_docker_compose.py 文件源码
项目:heat-agents
作者: openstack
项目源码
文件源码
阅读 117
收藏 0
点赞 0
评论 0
def test_run_heat_config(self):
with self.write_config_file(self.data) as config_file:
env = os.environ.copy()
env.update({
'HEAT_DOCKER_COMPOSE_WORKING': self.docker_compose_dir.join(),
'HEAT_SHELL_CONFIG': config_file.name
})
returncode, stdout, stderr = self.run_cmd(
[self.heat_config_docker_compose_path], env)
self.assertEqual(0, returncode, stderr)
compose_yml = self.docker_compose_dir.join(
'abcdef001/docker-compose.yml')
with open(compose_yml) as f:
self.assertEqual(yaml.safe_dump(
self.data[0].get('config'),
default_flow_style=False), f.read())
def output(python_object, format="raw", pager=False):
if format == 'yaml':
output_string = yaml.safe_dump(python_object, default_flow_style=False, encoding='utf-8', allow_unicode=True)
elif format == 'json':
output_string = json.dumps(python_object, sort_keys=4, indent=4)
elif format == 'raw':
output_string = str(python_object)
elif format == 'pformat':
output_string = pprint.pformat(python_object)
else:
raise Exception("No valid output format provided. Supported: 'yaml', 'json', 'raw', 'pformat'")
if pager:
click.echo_via_pager(output_string)
else:
click.echo(output_string)
def write_metadata(path, meta='.meta.yaml', **params):
"""Writes metadata for a dataset.
Args:
path (str): path to **dataset** (not meta file) whose metadata
is to be written. If the meta file already exists, it will be
overwritten.
meta (str): suffix identifying the dataset's meta file
**params: all other keyword arguments are treated as dataset attributes,
and added to the meta file
"""
if 'n_channels' in params:
del params['n_channels']
if 'n_samples' in params:
del params['n_samples']
if os.path.isdir(path):
metafile = os.path.join(path, meta[1:])
else:
metafile = path + meta
for k, v in params.items():
if isinstance(v, (np.ndarray, np.generic)):
params[k] = v.tolist()
with codecs.open(metafile, 'w', encoding='utf-8') as yaml_file:
yaml_file.write(yaml.safe_dump(params, default_flow_style=False))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--old", help="old password file", required=True)
parser.add_argument("--new", help="new password file", required=True)
parser.add_argument("--final", help="merged password file", required=True)
args = parser.parse_args()
with open(args.old, "r") as old_file:
old_passwords = yaml.safe_load(old_file)
with open(args.new, "r") as new_file:
new_passwords = yaml.safe_load(new_file)
new_passwords.update(old_passwords)
with open(args.final, "w") as destination:
yaml.safe_dump(new_passwords, destination, default_flow_style=False)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--old", help="old password file", required=True)
parser.add_argument("--new", help="new password file", required=True)
parser.add_argument("--final", help="merged password file", required=True)
args = parser.parse_args()
with open(args.old, "r") as old_file:
old_passwords = yaml.safe_load(old_file)
with open(args.new, "r") as new_file:
new_passwords = yaml.safe_load(new_file)
new_passwords.update(old_passwords)
with open(args.final, "w") as destination:
yaml.safe_dump(new_passwords, destination, default_flow_style=False)
def serialized(obj, status=200):
fmt = get_serializer()
if fmt == 'json':
ser = json.dumps
ct = 'application/json'
elif fmt == 'yaml':
ser = yaml.safe_dump
ct = 'text/plain+yaml' # For interop with browsers
elif fmt is None:
return None
else:
abort(404)
data = ser(obj)
resp = make_response(data, 200)
resp.headers['Content-Type'] = ct
return resp
# Authentication
def base_update_meta(self, meta_version, force=False):
try:
meta_version = normalize_meta_version(meta_version)
except Exception, e:
raise InvalidMetaVersion(e)
if meta_version == self.meta_version and not force:
return 'meta_version is already latest'
meta = self.fetch_meta(meta_version)
if not isinstance(meta, dict):
return None
self.check_giturl(meta, update=True)
meta['giturl'] = self.giturl
self.meta = yaml.safe_dump(meta, default_style='"')
self.meta_version = meta_version
if self.appname != meta['appname']:
raise InvalidLainYaml("appname dont match: %s" % meta)
self.save()
return 'meta updated'
def _write_galaxy_install_info(self):
"""
Writes a YAML-formatted file to the role's meta/ directory
(named .galaxy_install_info) which contains some information
we can use later for commands like 'list' and 'info'.
"""
info = dict(
version=self.version,
install_date=datetime.datetime.utcnow().strftime("%c"),
)
info_path = os.path.join(self.path, self.META_INSTALL)
with open(info_path, 'w+') as f:
try:
self._install_info = yaml.safe_dump(info, f)
except:
return False
return True
def update_write_config(config_file, update_dict):
"""Update a given configuration file with updated values.
If the configuration file does not exist, a new one is created.
Args:
config_file (str): the location of the config file to update
update_dict (dict): the items to update in the config file
"""
if not os.path.exists(config_file):
with open(config_file, 'a'):
pass
with open(config_file, 'r') as f:
config_dict = yaml.safe_load(f.read()) or {}
for key, value in update_dict.items():
loader = get_section_loader(key)
loader.update(config_dict, value)
with open(config_file, 'w') as f:
yaml.safe_dump(config_dict, f)
def toYAML(self, **options):
""" Serializes this Munch to YAML, using `yaml.safe_dump()` if
no `Dumper` is provided. See the PyYAML documentation for more info.
>>> b = Munch(foo=['bar', Munch(lol=True)], hello=42)
>>> import yaml
>>> yaml.safe_dump(b, default_flow_style=True)
'{foo: [bar, {lol: true}], hello: 42}\\n'
>>> b.toYAML(default_flow_style=True)
'{foo: [bar, {lol: true}], hello: 42}\\n'
>>> yaml.dump(b, default_flow_style=True)
'!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n'
>>> b.toYAML(Dumper=yaml.Dumper, default_flow_style=True)
'!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n'
"""
opts = dict(indent=4, default_flow_style=False)
opts.update(options)
if 'Dumper' not in opts:
return yaml.safe_dump(self, **opts)
else:
return yaml.dump(self, **opts)
def write(self):
''' write to file '''
if not self.filename:
raise YeditException('Please specify a filename.')
if self.backup and self.file_exists():
shutil.copy(self.filename, self.filename + '.orig')
tmp_filename = self.filename + '.yedit'
try:
with open(tmp_filename, 'w') as yfd:
yml_dump = yaml.safe_dump(self.yaml_dict, default_flow_style=False)
for line in yml_dump.strip().split('\n'):
if '{{' in line and '}}' in line:
yfd.write(line.replace("'{{", '"{{').replace("}}'", '}}"') + '\n')
else:
yfd.write(line + '\n')
except Exception as err:
raise YeditException(err.message)
os.rename(tmp_filename, self.filename)
return (True, self.yaml_dict)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
with open('result/{}/infrastructure/terraform.tfstate'.format(
self.grid_name), 'r') as json_file:
json_data = json.load(json_file)
for module in json_data['modules']:
for resource, value in module['resources'].iteritems():
if value['type'] == 'google_compute_instance':
host = '{}.node.{}'.format(
value['primary']['attributes'][
'name'], self.grid_name)
ip = value['primary']['attributes']['network_interface.0.address']
hosts_entries['hosts'][str(host)] = str(ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
variables['terminal_ip'] = self._nameserver()
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
with open('result/{}/infrastructure/terraform.tfstate'.format(self.grid_name), 'r') as json_file:
json_data = json.load(json_file)
for module in json_data['modules']:
for resource, value in module['resources'].iteritems():
if value['type'] == 'azure_instance':
host = '{}.node.{}'.format(value['primary']['attributes']['name'], self.grid_name)
ip = value['primary']['attributes']['ip_address']
hosts_entries['hosts'][str(host)] = str(ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
variables['terminal_ip'] = self._nameserver()
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
for group in self.current_groups:
for ip in group.groupips.split(','):
hostname = ip.replace('.','-')
host = '{}.node.{}'.format(hostname, self.grid_name)
hosts_entries['hosts'][str(host)] = str(ip)
for ip in self.current_config.mastersips.split(','):
hostname = ip.replace('.','-')
host = '{}.node.{}'.format(hostname, self.grid_name)
hosts_entries['hosts'][str(host)] = str(ip)
terminal_ip = self.current_config.terminalips.split(',')[1]
terminal_hostname = terminal_ip.replace('.','-')
terminal_host = '{}.node.{}'.format(terminal_hostname, self.grid_name)
hosts_entries['hosts'][str(terminal_host)] = str(terminal_ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
variables['terminal_ip'] = self._nameserver()
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
with open('result/{}/infrastructure/terraform.tfstate'.format(self.grid_name), 'r') as json_file:
json_data = json.load(json_file)
for module in json_data['modules']:
for resource, value in module['resources'].iteritems():
if value['type'] == 'aws_instance':
hostname = value['primary']['attributes']['private_dns'].split('.')[0]
host = '{}.node.{}'.format(hostname, self.grid_name)
ip = value['primary']['attributes']['private_ip']
hosts_entries['hosts'][str(host)] = str(ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
variables['terminal_ip'] = self._nameserver()
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
with open('result/{}/infrastructure/terraform.tfstate'.format(
self.grid_name), 'r') as json_file:
json_data = json.load(json_file)
for module in json_data['modules']:
for resource, value in module['resources'].iteritems():
if value['type'] == 'google_compute_instance':
host = '{}.node.{}'.format(
value['primary']['attributes'][
'name'], self.grid_name)
ip = value['primary']['attributes']['network_interface.0.address']
hosts_entries['hosts'][str(host)] = str(ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
with open('result/{}/infrastructure/terraform.tfstate'.format(
self.grid_name), 'r') as json_file:
json_data = json.load(json_file)
for module in json_data['modules']:
for resource, value in module['resources'].iteritems():
if value['type'] == 'azure_instance':
host = '{}.node.{}'.format(
value['primary']['attributes'][
'name'], self.grid_name)
ip = value['primary']['attributes']['ip_address']
hosts_entries['hosts'][str(host)] = str(ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)
def generate_group_vars_all(self):
path = 'result/{}/group_vars/all'.format(self.grid_name)
variables = AutoDict()
hosts_entries = AutoDict()
for group in self.current_groups:
for ip in group.groupips.split(','):
hostname = ip.replace('.','-')
host = '{}.node.{}'.format(hostname, self.grid_name)
hosts_entries['hosts'][str(host)] = str(ip)
for ip in self.current_config.mastersips.split(','):
hostname = ip.replace('.','-')
host = '{}.node.{}'.format(hostname, self.grid_name)
hosts_entries['hosts'][str(host)] = str(ip)
terminal_ip = self.current_config.terminalips.split(',')[1]
terminal_hostname = terminal_ip.replace('.','-')
terminal_host = '{}.node.{}'.format(terminal_hostname, self.grid_name)
hosts_entries['hosts'][str(terminal_host)] = str(terminal_ip)
variables['hosts'] = json.dumps(hosts_entries['hosts'])
variables['grid_name'] = self.current_grid.name
self._generate_template(path, variables)
vars_json = json.loads(self.current_config.vars)
vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
with open(path, "a") as yaml_file:
yaml_file.write(vars_yaml)