def hugepage_support(user, group='hugetlb', nr_hugepages=256,
max_map_count=65536, mnt_point='/run/hugepages/kvm',
pagesize='2MB', mount=True, set_shmmax=False):
"""Enable hugepages on system.
Args:
user (str) -- Username to allow access to hugepages to
group (str) -- Group name to own hugepages
nr_hugepages (int) -- Number of pages to reserve
max_map_count (int) -- Number of Virtual Memory Areas a process can own
mnt_point (str) -- Directory to mount hugepages on
pagesize (str) -- Size of hugepages
mount (bool) -- Whether to Mount hugepages
"""
group_info = add_group(group)
gid = group_info.gr_gid
add_user_to_group(user, group)
if max_map_count < 2 * nr_hugepages:
max_map_count = 2 * nr_hugepages
sysctl_settings = {
'vm.nr_hugepages': nr_hugepages,
'vm.max_map_count': max_map_count,
'vm.hugetlb_shm_group': gid,
}
if set_shmmax:
shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
if shmmax_minsize > shmmax_current:
sysctl_settings['kernel.shmmax'] = shmmax_minsize
sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
lfstab = fstab.Fstab()
fstab_entry = lfstab.get_entry_by_attr('mountpoint', mnt_point)
if fstab_entry:
lfstab.remove_entry(fstab_entry)
entry = lfstab.Entry('nodev', mnt_point, 'hugetlbfs',
'mode=1770,gid={},pagesize={}'.format(gid, pagesize), 0, 0)
lfstab.add_entry(entry)
if mount:
fstab_mount(mnt_point)
python类dump()的实例源码
def retrieve(cont, filename):
stream = file(filename, 'r')
data = yaml.load(stream)
#return yaml.dump(data, encoding=('utf-8'), default_flow_style=False, allow_unicode=True)
return data[cont].encode('utf-8')
def internal_data(filename, io, entry, cont, cont_in = None, cont_in2 = None): #Supports up to 3 containers stacked on top.
"filename = 'string',, io = [in, out],, entry = val,, cont,,..."
stream = open(filename, 'r')
prof = yaml.load(stream)
if io == 'out':
if cont_in == None:
val = prof[cont]
else:
if cont_in2 == None:
val = prof[cont][cont_in]
else:
val = prof[cont][cont_in][cont_in2]
return val
if io == 'in':
if cont_in == None:
prof[cont] = entry
else:
if cont_in2 == None:
prof[cont][cont_in] = entry
else:
prof[cont][cont_in][cont_in2] = entry
with open(filename, 'w') as yaml_file:
yaml_file.write(yaml.dump(prof, default_flow_style = False))
def begin(self):
variables = tf.contrib.framework.get_variables(scope=self.params["prefix"])
def varname_in_checkpoint(name):
"""Removes the prefix from the variable name.
"""
prefix_parts = self.params["prefix"].split("/")
checkpoint_prefix = "/".join(prefix_parts[:-1])
return name.replace(checkpoint_prefix + "/", "")
target_names = [varname_in_checkpoint(_.op.name) for _ in variables]
restore_map = {k: v for k, v in zip(target_names, variables)}
tf.logging.info("Restoring variables: \n%s",
yaml.dump({k: v.op.name
for k, v in restore_map.items()}))
self._saver = tf.train.Saver(restore_map)
def test_invalid_form_data_unknown_url_name(self):
data = {
'url': '/test/',
'alias': 'test-page',
'description': 'At vero eos et accusamus et iusto odio',
'keywords': 'lorem ipsum dolor sit amet',
'page_processor': 'powerpages.RedirectProcessor',
'page_processor_config': yaml.dump({
'to name': 'not-existing-url'
}),
'template': '<h1>{{ website_page.title }}</h1>\n',
'title': 'De Finibus Bonorum et Malorum'
}
form = PageAdminForm(data, instance=Page())
self.assertFalse(form.is_valid())
self.assertEqual(list(form.errors.keys()), ['__all__'])
def test_invalid_form_data_unknown_parent_template(self):
# No parent page
data = {
'url': '/test/',
'alias': 'test-page',
'description': 'At vero eos et accusamus et iusto odio',
'keywords': 'lorem ipsum dolor sit amet',
'page_processor': 'powerpages.DefaultPageProcessor',
'page_processor_config': yaml.dump({
'base template': "this-template-does-not-exist.html"
}),
'template': '<h1>{{ website_page.title }}</h1>\n',
'title': 'De Finibus Bonorum et Malorum'
}
form = PageAdminForm(data, instance=Page())
self.assertFalse(form.is_valid())
self.assertEqual(list(form.errors.keys()), ['__all__'])
def edit_mkdocs_config(self):
"""
Create mkdocs.yml file from metadata
:return: Boolean indicating the success of the operation.
"""
mkdocs_yml = os.path.join(MKDOCS_DIR, "mkdocs.yml")
cfg = dict(
site_name=self._wiki_name,
theme='readthedocs',
docs_dir=self._wiki_name,
site_dir=self._out_dir
)
with open(mkdocs_yml, 'w') as outfile:
yaml.dump(cfg, outfile, default_flow_style=False)
def set_device_yaml():
"""
???????Android version?????yaml?
:return:
"""
device_lst = []
for device in get_device():
adb = lib.adbUtils.ADB(device)
U.Logging.success(
'get device:{},Android version:{}'.format(
device, adb.get_android_version()))
device_lst.append({'platformVersion': adb.get_android_version(
), 'deviceName': device, 'platformName': 'Android'})
ini = U.ConfigIni()
with open(ini.get_ini('test_device', 'device'), 'w') as f:
yaml.dump(device_lst, f)
f.close()
def __load_analysis(self):
"""
????
????:
1:????log
2:????
3:??????
4:??????
5:??
:return: ????
"""
U.Logging.success('read the yaml file')
self.__save_android_log()
error_msg = self.__analysis_yaml(self.path_yaml)
with open(self.__save_error_status(), 'w') as f:
yaml.dump({'error_msg': error_msg}, f)
U.Logging.debug(str('results of the:%s' % error_msg))
f.close()
return self.__save_screen_file()
def convert(self, common):
"""
Process all hardware profiles
"""
for pathname in common.keys():
for filename in common[pathname]:
if 'profile-' in filename:
with open(filename, "r") as yml:
content = yaml.safe_load(yml)
migrated = _migrate(content, filename)
newfilename = re.sub('profile-', 'migrated-profile-', filename)
path_dir = os.path.dirname(newfilename)
_create_dirs(path_dir, self.pillar_dir)
with open(newfilename, "w") as yml:
yml.write(yaml.dump(migrated,
Dumper=self.friendly_dumper,
default_flow_style=False))
def _record_filter(args, base_dir):
"""
Save the filter provided
"""
filter_file = '{}/.filter'.format(base_dir)
if not isfile(filter_file):
# do a touch filter_file
open(filter_file, 'a').close()
current_filter = {}
with open(filter_file) as filehandle:
current_filter = yaml.load(filehandle)
if current_filter is None:
current_filter = {}
pprint.pprint(current_filter)
# filter a bunch of salt content and the target key before writing
rec_args = {k: v for k, v in args.items() if k is not 'target' and not
k.startswith('__')}
current_filter[args['target']] = rec_args
with open(filter_file, 'w') as filehandle:
yaml.dump(current_filter, filehandle, default_flow_style=False)
def _update_grains(self, content, filename="/etc/salt/grains"):
"""
Update the yaml file without destroying other content
"""
log.info("Updating {}".format(filename))
# Keep yaml human readable/editable
friendly_dumper = yaml.SafeDumper
friendly_dumper.ignore_aliases = lambda self, data: True
with open(filename, 'w') as minion_grains:
minion_grains.write(yaml.dump(content,
Dumper=friendly_dumper,
default_flow_style=False))
log.info("Syncing grains")
__salt__['saltutil.sync_grains']()
def roles():
"""
Remove the roles from the cluster/*.sls files
"""
# Keep yaml human readable/editable
friendly_dumper = yaml.SafeDumper
friendly_dumper.ignore_aliases = lambda self, data: True
cluster_dir = '/srv/pillar/ceph/cluster'
for filename in os.listdir(cluster_dir):
pathname = "{}/{}".format(cluster_dir, filename)
content = None
with open(pathname, "r") as sls_file:
content = yaml.safe_load(sls_file)
log.info("content {}".format(content))
if 'roles' in content:
content.pop('roles')
with open(pathname, "w") as sls_file:
sls_file.write(yaml.dump(content, Dumper=friendly_dumper,
default_flow_style=False))
def default():
"""
Remove the .../stack/defaults directory. Preserve available_roles
"""
# Keep yaml human readable/editable
friendly_dumper = yaml.SafeDumper
friendly_dumper.ignore_aliases = lambda self, data: True
preserve = {}
content = None
pathname = "/srv/pillar/ceph/stack/default/{}/cluster.yml".format('ceph')
with open(pathname, "r") as sls_file:
content = yaml.safe_load(sls_file)
preserve['available_roles'] = content['available_roles']
stack_default = "/srv/pillar/ceph/stack/default"
shutil.rmtree(stack_default)
os.makedirs("{}/{}".format(stack_default, 'ceph'))
with open(pathname, "w") as sls_file:
sls_file.write(yaml.dump(preserve, Dumper=friendly_dumper,
default_flow_style=False))
uid = pwd.getpwnam("salt").pw_uid
gid = grp.getgrnam("salt").gr_gid
for path in [stack_default, "{}/{}".format(stack_default, 'ceph'), pathname]:
os.chown(path, uid, gid)
def init(banner, hidden, backup):
"""Initialize a manage shell in current directory
$ manage init --banner="My awesome app shell"
initializing manage...
creating manage.yml
"""
manage_file = HIDDEN_MANAGE_FILE if hidden else MANAGE_FILE
if os.path.exists(manage_file):
if not click.confirm('Rewrite {0}?'.format(manage_file)):
return
if backup:
bck = '.bck_{0}'.format(manage_file)
with open(manage_file, 'r') as source, open(bck, 'w') as bck_file:
bck_file.write(source.read())
with open(manage_file, 'w') as output:
data = default_manage_dict
if banner:
data['shell']['banner']['message'] = banner
output.write(yaml.dump(data, default_flow_style=False))
def save_parameters(stack, params):
"""saves parameters to disk"""
# decode parameter dict
params_dict = {}
for param in params:
params_dict[param['ParameterKey']] = param['ParameterValue']
stack_dir = path.join('stacks', stack)
param_path = path.join(stack_dir, 'parameters.yaml')
# ensure paths are present
if not path.exists('stacks'):
mkdir('stacks')
if not path.exists(stack_dir):
mkdir(stack_dir)
with open(param_path, mode='w', encoding='utf-8') as file:
file.write(yaml.dump(params_dict, default_flow_style=False, explicit_start=True))
def return_config_overrides_yaml(self,
config_overrides,
resultant,
list_extend=True,
ignore_none_type=True):
"""Return config yaml.
:param config_overrides: ``dict``
:param resultant: ``str`` || ``unicode``
:returns: ``str``
"""
original_resultant = yaml.safe_load(resultant)
merged_resultant = self._merge_dict(
base_items=original_resultant,
new_items=config_overrides,
list_extend=list_extend
)
return yaml.dump(
merged_resultant,
Dumper=IDumper,
default_flow_style=False,
width=1000,
)
def test_add_and_remove_config(self):
self.init_with_remote_catalog()
with self.captured_output() as (out, err):
StateHolder.skip_docker = True
poco = Poco(home_dir=self.tmpdir, argv=["catalog", "config", "add", "teszt", "ssh://teszt.teszt/teszt"])
poco.run()
self.assertEqual(0, len(err.getvalue()))
data = dict()
data["teszt"] = dict()
data["teszt"]["repositoryType"] = "git"
data["teszt"]["server"] = "ssh://teszt.teszt/teszt"
self.assertIn(yaml.dump(data, default_flow_style=False, default_style='', indent=4).strip(),
out.getvalue().strip())
self.clean_states()
with self.captured_output() as (out, err):
StateHolder.skip_docker = True
poco = Poco(home_dir=self.tmpdir, argv=["catalog", "config", "remove", "teszt"])
poco.run()
self.assertEqual(0, len(err.getvalue()))
self.assertNotIn("teszt", out.getvalue().strip())
def generate_test_accounts_file(tenant_id):
"""
Add needed tenant and user params into test_accounts.yaml
"""
logger.debug("Add needed params into test_accounts.yaml...")
accounts_list = [
{
'tenant_name':
CONST.__getattribute__('tempest_identity_tenant_name'),
'tenant_id': str(tenant_id),
'username': CONST.__getattribute__('tempest_identity_user_name'),
'password':
CONST.__getattribute__('tempest_identity_user_password')
}
]
with open(TEST_ACCOUNTS_FILE, "w") as f:
yaml.dump(accounts_list, f, default_flow_style=False)
def post(self):
if self.current_user['level'] != 0:
self.custom_error()
settings = {
'init_money': int(self.get_body_argument('init_money')),
'reg_type': self.get_body_argument('reg_type'),
'cookie_secret': self.get_body_argument('cookie_secret') or self.settings['cookie_secret'],
'site': {
'name': self.get_body_argument('sitename'),
'keyword': self.get_body_argument('keyword'),
'description': self.get_body_argument('description')
}
}
self.settings.update(settings)
custom_settings = {}
with open(self.settings['config_file'], 'r') as f:
custom_settings = yaml.load(f)
custom_settings['global'].update(settings)
with open(self.settings['config_file'], 'w') as f:
yaml.dump(custom_settings, f,
default_flow_style=False, default_style='"')
self.redirect('/ushio/setting')
test_nova_cc_hooks.py 文件源码
项目:charm-nova-cloud-controller
作者: openstack
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def test_install_hook_git(self):
self.git_install_requested.return_value = True
self.determine_packages.return_value = ['foo', 'bar']
self.determine_ports.return_value = [80, 81, 82]
repo = 'cloud:trusty-juno'
openstack_origin_git = {
'repositories': [
{'name': 'requirements',
'repository': 'git://git.openstack.org/openstack/requirements', # noqa
'branch': 'stable/juno'},
{'name': 'nova',
'repository': 'git://git.openstack.org/openstack/nova',
'branch': 'stable/juno'}
],
'directory': '/mnt/openstack-git',
}
projects_yaml = yaml.dump(openstack_origin_git)
self.test_config.set('openstack-origin', repo)
self.test_config.set('openstack-origin-git', projects_yaml)
hooks.install()
self.git_install.assert_called_with(projects_yaml)
self.apt_install.assert_called_with(['foo', 'bar'], fatal=True)
self.assertTrue(self.execd_preinstall.called)
self.assertTrue(self.disable_services.called)
self.cmd_all_services.assert_called_with('stop')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('key', help='key name to be fetched ("." to fetch all)')
parser.add_argument('conf', help='path to a config file')
ns = parser.parse_args()
try:
val = fetch_value_from_file(ns.key, ns.conf)
except KeyError:
log('no such key: ' + ns.key)
sys.exit(1)
if isinstance(val, dict) or isinstance(val, list):
print yaml.dump(val)
else:
print val
def to_yaml(self):
"""
Pretty print dump as YAML.
"""
return dump(
self.to_safe_dict(),
# show every document in its own block
default_flow_style=False,
# start a new document (via "---") before every resource
explicit_start=True,
# follow (modern) PEP8 max line length and indent
width=99,
indent=4,
Dumper=SafeDumper,
)
def test_basic_config():
fd, path = tempfile.mkstemp()
f = os.fdopen(fd,'w')
f.write(yaml.dump(testcfg))
f.flush()
cfg = ny.get_config(path)
ny.write_supervisor_conf()
config = ConfigParser.ConfigParser()
config.readfp(open(cfg['supervisor.conf']))
# from IPython import embed
# embed()
print(config.get('program:testtunnel','command'))
assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def fake_metta(matrix_dict, metadata):
"""Stores matrix and metadata in a metta-data-like form
Args:
matrix_dict (dict) of form { columns: values }.
Expects an entity_id to be present which it will use as the index
metadata (dict). Any metadata that should be set
Yields:
tuple of filenames for matrix and metadata
"""
matrix = pandas.DataFrame.from_dict(matrix_dict).set_index('entity_id')
with tempfile.NamedTemporaryFile() as matrix_file:
with tempfile.NamedTemporaryFile('w') as metadata_file:
hdf = pandas.HDFStore(matrix_file.name)
hdf.put('title', matrix, data_columns=True)
matrix_file.seek(0)
yaml.dump(metadata, metadata_file)
metadata_file.seek(0)
yield (matrix_file.name, metadata_file.name)
def fake_metta(matrix_dict, metadata):
"""Stores matrix and metadata in a metta-data-like form
Args:
matrix_dict (dict) of form { columns: values }.
Expects an entity_id to be present which it will use as the index
metadata (dict). Any metadata that should be set
Yields:
tuple of filenames for matrix and metadata
"""
matrix = pd.DataFrame.from_dict(matrix_dict).set_index('entity_id')
with tempfile.NamedTemporaryFile() as matrix_file:
with tempfile.NamedTemporaryFile('w') as metadata_file:
hdf = pd.HDFStore(matrix_file.name)
hdf.put('title', matrix, data_columns=True)
matrix_file.seek(0)
yaml.dump(metadata, metadata_file)
metadata_file.seek(0)
yield (matrix_file.name, metadata_file.name)
def publish_updating(self):
LOG.info("Sending updating request")
nsd = open('test/test_descriptors/nsdu.yml', 'r')
message = {'NSD': yaml.load(nsd), 'UUID':'937213ae-890b-413c-a11e-45c62c4eee3f'}
self.manoconn.call_async(self._on_publish_ins_response,
'specific.manager.registry.ssm.update',
yaml.dump(message))
vnfd1 = open('test/test_descriptors/vnfdu.yml', 'r')
message = {'VNFD': yaml.load(vnfd1), 'UUID':'754fe4fe-96c9-484d-9683-1a1e8b9a31a3'}
self.manoconn.call_async(self._on_publish_ins_response,
'specific.manager.registry.fsm.update',
yaml.dump(message))
nsd.close()
vnfd1.close()
def publish_terminating(self):
nsd = open('test/test_descriptors/nsdt.yml', 'r')
message = {'NSD': yaml.load(nsd), 'UUID': '937213ae-890b-413c-a11e-45c62c4eee3f'}
self.manoconn.call_async(self._on_publish_ins_response,
'specific.manager.registry.ssm.terminate',
yaml.dump(message))
vnfd1 = open('test/test_descriptors/vnfdt1.yml', 'r')
message = {'VNFD': yaml.load(vnfd1), 'UUID': 'c32b731f-7eea-4afd-9c60-0b0d0ea37eed'}
self.manoconn.call_async(self._on_publish_ins_response,
'specific.manager.registry.fsm.terminate',
yaml.dump(message))
vnfd2 = open('test/test_descriptors/vnfdt2.yml', 'r')
message = {'VNFD': yaml.load(vnfd2), 'UUID': '754fe4fe-96c9-484d-9683-1a1e8b9a31a3'}
self.manoconn.call_async(self._on_publish_ins_response,
'specific.manager.registry.fsm.terminate',
yaml.dump(message))
nsd.close()
vnfd1.close()
vnfd2.close()
def publish_nsd(self):
LOG.info("Sending onboard request")
nsd = open('test/test_descriptors/nsd.yml', 'r')
message = {'NSD': yaml.load(nsd)}
self.manoconn.call_async(self._on_publish_nsd_response,
'specific.manager.registry.ssm.on-board',
yaml.dump(message))
vnfd1 = open('test/test_descriptors/vnfd1.yml', 'r')
message = {'VNFD': yaml.load(vnfd1)}
self.manoconn.call_async(self._on_publish_nsd_response,
'specific.manager.registry.fsm.on-board',
yaml.dump(message))
vnfd2 = open('test/test_descriptors/vnfd2.yml', 'r')
message = {'VNFD': yaml.load(vnfd2)}
self.manoconn.call_async(self._on_publish_nsd_response,
'specific.manager.registry.fsm.on-board',
yaml.dump(message))
nsd.close()
vnfd1.close()
vnfd2.close()
def publish_sid(self):
LOG.info("Sending instantiate request")
nsd = open('test/test_descriptors/nsd.yml', 'r')
message = {'NSD': yaml.load(nsd), 'UUID': '937213ae-890b-413c-a11e-45c62c4eee3f'}
self.manoconn.call_async(self._on_publish_sid_response,
'specific.manager.registry.ssm.instantiate',
yaml.dump(message))
vnfd1 = open('test/test_descriptors/vnfd1.yml', 'r')
message = {'VNFD': yaml.load(vnfd1), 'UUID': 'c32b731f-7eea-4afd-9c60-0b0d0ea37eed'}
self.manoconn.call_async(self._on_publish_sid_response,
'specific.manager.registry.fsm.instantiate',
yaml.dump(message))
vnfd2 = open('test/test_descriptors/vnfd2.yml', 'r')
message = {'VNFD': yaml.load(vnfd2), 'UUID': '754fe4fe-96c9-484d-9683-1a1e8b9a31a3'}
self.manoconn.call_async(self._on_publish_sid_response,
'specific.manager.registry.fsm.instantiate',
yaml.dump(message))
nsd.close()
vnfd1.close()
vnfd2.close()