def load(self, istream):
"""
Load the model from input stream.
reset() is called to clean up network instance.
"""
self.reset()
allLayer = {}
for layer in yaml.load_all(istream):
allLayer[layer.saveName] = layer
if issubclass(type(layer), RawInput):
self.setInput(layer, reload=True)
# TODO: consider there are multiple input layer
# TODO: branch and merge
shouldStop = False
currentLayer = self.currentLayer
while not shouldStop:
self.append(allLayer[currentLayer.outputLayerName[0]], reload=True)
currentLayer = allLayer[currentLayer.outputLayerName[0]]
if len(currentLayer.outputLayerName) <= 0:
shouldStop = True
python类load_all()的实例源码
def test_update_dictionary_valid(self):
expected = "{}/templates/override-{}-expected.yaml".format(
self.basepath, '01')
merge = "{}/templates/override-{}.yaml".format(self.basepath, '01')
with open(self.base_manifest) as f, open(expected) as e, open(
merge) as m:
merging_values = list(yaml.safe_load_all(m.read()))
doc_obj = list(yaml.safe_load_all(f.read()))
doc_path = ['chart', 'blog-1']
ovr = Override(doc_obj)
ovr.update_document(merging_values)
ovr_doc = ovr.find_manifest_document(doc_path)
expect_doc = list(yaml.load_all(e.read()))[0]
self.assertEqual(ovr_doc, expect_doc)
def get_chart_templates(self, template_name, name, release_name, namespace,
chart, disable_hooks, values):
# returns some info
LOG.info("Template( %s ) : %s ", template_name, name)
stub = ReleaseServiceStub(self.channel)
release_request = InstallReleaseRequest(
chart=chart,
dry_run=True,
values=values,
name=name,
namespace=namespace,
wait=False)
templates = stub.InstallRelease(
release_request, self.timeout, metadata=self.metadata)
for template in yaml.load_all(
getattr(templates.release, 'manifest', [])):
if template_name == template.get('metadata', None).get(
'name', None):
LOG.info(template_name)
return template
def get_content(self):
"""Return a single document from YAML"""
def multi_constructor(loader, tag_suffix, node):
"""Stores all unknown tags content into a dict
Original yaml:
!unknown_tag
- some content
Python object:
{"!unknown_tag": ["some content", ]}
"""
if type(node.value) is list:
if type(node.value[0]) is tuple:
return {node.tag: loader.construct_mapping(node)}
else:
return {node.tag: loader.construct_sequence(node)}
else:
return {node.tag: loader.construct_scalar(node)}
yaml.add_multi_constructor("!", multi_constructor)
with self.__get_file() as file_obj:
self.__documents = [x for x in yaml.load_all(file_obj)]
return self.__documents[self.__document_id]
def load_camera_from_calibr(f):
s = open(f).read()
y = list(yaml.load_all(s))[0]
K0 = Camera.buildK(y['cam0']['intrinsics'])
C0 = Camera(K0, np.eye(3), np.zeros((3,)))
K1 = Camera.buildK(y['cam1']['intrinsics'])
M = y['cam1']['T_cn_cnm1'][:3]
R1 = np.asarray([k[:3] for k in M])
t1 = np.asarray([k[3] for k in M])
C1 = Camera(K1, R1, t1)
dist0 = np.asarray(y['cam0']['distortion_coeffs'])
dist1 = np.array(y['cam1']['distortion_coeffs'])
return C0, C1, dist0, dist1
def parse_kubeconfig():
if not os.path.exists(os.path.expanduser("~/.kube/config")):
return ("", "", "")
with open(os.path.expanduser("~/.kube/config"), "r") as fd:
docs = yaml.load_all(fd)
for doc in docs:
current_context = doc.get("current-context", "")
contexts = doc.get("contexts")
if contexts:
for index, context in enumerate(contexts):
if context['name'] == current_context:
KubeConfig.current_context_index = index
KubeConfig.current_context_name = context['name']
if 'cluster' in context['context']:
KubeConfig.clustername = context['context']['cluster']
if 'namespace' in context['context']:
KubeConfig.namespace = context['context']['namespace']
if 'user' in context['context']:
KubeConfig.user = context['context']['user']
return (KubeConfig.clustername, KubeConfig.user, KubeConfig.namespace)
return ("", "", "")
def openCollection(self, fileName):
try:
f = open(unicode(fileName), 'r')
Mainframe.model = model.Model()
Mainframe.model.delete(0)
for data in yaml.load_all(f):
Mainframe.model.add(model.makeSafe(data), False)
f.close()
Mainframe.model.is_dirty = False
except IOError:
msgBox(Lang.value('MSG_IO_failed'))
Mainframe.model = model.Model()
except yaml.YAMLError as e:
msgBox(Lang.value('MSG_YAML_failed') % e)
Mainframe.model = model.Model()
else:
if len(Mainframe.model.entries) == 0:
Mainframe.model = model.Model()
Mainframe.model.filename = unicode(fileName)
finally:
Mainframe.sigWrapper.sigModelChanged.emit()
def read():
with open(Conf.file, 'r') as f:
Conf.values = yaml.load(f)
Conf.zoos = []
with open(Conf.zoo_file, 'r') as f:
for zoo in yaml.load_all(f):
Conf.zoos.append(zoo)
with open(Conf.keywords_file, 'r') as f:
Conf.keywords = yaml.load(f)
with open(Conf.popeye_file, 'r') as f:
Conf.popeye = yaml.load(f)
with open(Conf.chest_file, 'r') as f:
Conf.chest = yaml.load(f)
def get_content(self):
"""Return a single document from YAML"""
def multi_constructor(loader, tag_suffix, node):
"""Stores all unknown tags content into a dict
Original yaml:
!unknown_tag
- some content
Python object:
{"!unknown_tag": ["some content", ]}
"""
if type(node.value) is list:
if type(node.value[0]) is tuple:
return {node.tag: loader.construct_mapping(node)}
else:
return {node.tag: loader.construct_sequence(node)}
else:
return {node.tag: loader.construct_scalar(node)}
yaml.add_multi_constructor("!", multi_constructor)
with self.__get_file() as file_obj:
self.__documents = [x for x in yaml.load_all(file_obj)]
return self.__documents[self.__document_id]
def display(self, f):
def handle_input(key):
if key in ('q', 'Q'):
raise urwid.ExitMainLoop()
elif key in ('right', 'j', ' '):
if self.slide_id < len(self.sd) - 1:
self.slide_id += 1
elif key in ('left', 'k'):
if self.slide_id > 0:
self.slide_id -= 1
self.update_display()
self.load_charset()
self.sd = list(yaml.load_all(f))
self.slide_id = 0
self.update_display()
txt = urwid.Text(u"Presenting...")
fill = urwid.Filler(txt, 'bottom')
urwid.MainLoop(fill, unhandled_input=handle_input).run()
def runTest(self):
os.environ['RANDOMSEED'] = '2'
options = get_options(['--samples', '10', '--output', 'ch13_r05_test.yaml'])
face_count = write_rolls(options.output_path, roll_iter(options.samples, options.seed))
self.assertDictEqual(
{8: 8, 7: 6, 10: 5, 4: 3, 6: 3, 9: 3, 2: 2, 3: 1, 5: 1, 11: 1, 12: 1},
face_count)
results = list(yaml.load_all(self.data_path.read_text()))
self.assertListEqual(
[[[1, 1]],
[[1, 3], [2, 6], [6, 3], [3, 5], [2, 5]],
[[1, 5], [6, 2], [4, 6], [4, 6], [5, 3], [5, 4], [5, 3], [1, 1], [3, 4]],
[[3, 4]],
[[4, 5], [2, 5]],
[[2, 2], [2, 1], [2, 3], [2, 2]],
[[5, 5], [3, 5], [6, 5], [2, 4], [4, 6]],
[[5, 3], [5, 3]],
[[3, 4]],
[[2, 4], [6, 6], [4, 6], [5, 2]]],
results)
def get_deployer_notes(stig_id):
"""Read deployer notes based on the STIG ID."""
filename = "{0}/rhel7/{1}.rst".format(METADATA_DIR, stig_id)
# Does this deployer note exist?
if not os.path.isfile(filename):
return False
# Read the note and parse it with YAML
with open(filename, 'r') as f:
rst_file = f.read()
# Split the RST into frontmatter and text
# NOTE(mhayden): Can't use the standard yaml.load_all() here at it will
# have scanner errors in documents that have colons (:).
yaml_boundary = re.compile(r'^-{3,}$', re.MULTILINE)
_, metadata, text = yaml_boundary.split(rst_file, 2)
# Assemble the metadata and the text from the deployer note.
post = yaml.safe_load(metadata)
post['content'] = text
return post
def get_api_objs(self, group, manifest, ctx=None):
if ctx is None:
ctx = {}
ctx = self.get_manifest_ctx(group, manifest, **ctx)
docs = yaml.load_all(
self.cluster.decode_manifest(
self.cluster.config["release"][group]["manifests"][manifest],
ctx,
)
)
objs = collections.defaultdict(list)
for doc in docs:
obj = getattr(pykube.objects, doc["kind"])(self.api, doc)
if obj.exists():
obj.reload()
# set the shadow object to the original doc enabling proper
# update handling if the object has changed in the manifest
obj.obj = doc
objs[doc["kind"]].append(obj)
return objs
def _check_if_pod_or_more(self, app_id, app_info):
only_pod = True
kind_list = []
app_dir = app_info['app_location']
app_folder_name = app_info['app_folder_name']
df_dir = app_dir + "/" + app_folder_name
app_yaml = app_info['app_yaml']
stream = open(df_dir + "/" + app_yaml, "r")
docs = yaml.load_all(stream)
for doc in docs:
for k,v in doc.items():
if k == 'kind':
kind_list.append(v.strip())
if 'Service' in kind_list or 'Deployment' in kind_list:
only_pod = False
return only_pod
def validate_app_format(app_file_name):
valid = False
kind_set = []
try:
stream = open(app_file_name, "r")
docs = yaml.load_all(stream)
for doc in docs:
for k,v in doc.items():
if k == 'kind':
kind_set.append(v.strip())
if k == 'app':
valid = True
break
if len(kind_set) == 1 and 'Pod' in kind_set:
valid = True
except Exception as e:
print("Could not parse %s" % app_file_name)
exit()
return valid
def initialize():
f = cfg.CONF.drilldown.mappingfile
with open(f, 'r') as stream:
a = yaml.load_all(stream)
for dictio in a:
for key, value in dictio.iteritems():
for mylist in value:
obj = MappingFile(mylist['sourcelabel'], mylist['targetlabel'], mylist['returnfields'],
mylist['nextfields'], mylist['datasource'])
odict[mylist['sourcelabel']] = obj
def get_config():
if not len(sys.argv) > 1:
raise Exception('please give config file path')
if not os.path.exists(sys.argv[1]):
raise Exception("config file doesn't exists")
stream = open(sys.argv[1], "r")
docs = yaml.load_all(stream)
for f in docs:
return f
def pull_yaml(args, source):
"""
Pull YAML resources from a file-like object.
"""
for dct in load_all(source):
for item in dct.items():
yield item
def to_dict(path):
"""
Normalize each resource as a dictionary for smarter comparison.
Relies on link sorting performed by the pull script.
"""
with open(path) as file_:
return {
key: value
for dct in load_all(file_)
for key, value in dct.items()
}
def fromObject(cls, obj, decode=None):
orig = Documentation.fromObject(obj, decode)
self = cls(orig.raw)
lines = self.raw.splitlines()
if '---' in lines:
n = lines.index('---')
this, that = '\n'.join(lines[:n]), '\n'.join(lines[n:])
self.yamlData = {}
for ydoc in yaml.load_all(that):
assert isinstance(ydoc, dict), "only dict-like structures allowed in yaml docstrings not %r" % type(ydoc)
self.yamlData.update(ydoc)
else:
this = '\n'.join(lines)
self.raw = this
return self
def regular_load(stream, loader=yaml.loader.Loader):
# LOAD fails if more than one document is there
# return yaml.load(fh)
# LOAD ALL gets more than one document inside the file
# gen = yaml.load_all(fh)
return yaml.load_all(stream, loader)
def load_all(stream):
return yaml.load_all(stream, Loader=Loader)
def load_with_includes(filename):
with open(filename) as f:
docs = list(load_all(f))
base_dir = os.path.dirname(filename)
res = AttrDict()
for doc in docs:
if isinstance(doc, Includes):
for inc_file in doc.lst:
if not os.path.isabs(inc_file):
inc_file = os.path.join(base_dir, inc_file)
inc_res = load_with_includes(inc_file)
res._merge(inc_res)
else:
res._merge(doc)
return res
def ds_spec(self):
if not os.path.isfile(self.ds_yaml_file):
return None
with open(self.ds_yaml_file) as ds_conf:
return [i for i in yaml.load_all(ds_conf)]
def load_current_file():
print("Loading current file: {}".format(current_file))
global current_docs, current_doc
try:
with open(current_file, "r", encoding="UTF-8") as f:
current_docs = list(yaml.load_all(f))
current_doc = len(current_docs)
except FileNotFoundError:
pass
except yaml.YAMLError:
print("Failed to parse edit file")
def load_cases():
cases_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'cases')
cases_files = [os.path.join(cases_path, f)for f in os.listdir(cases_path)
if os.path.isfile(os.path.join(cases_path, f))]
cases = []
for cases_file in cases_files:
with open(cases_file) as f:
cases.extend(list(yaml.load_all(f)))
return cases
def yaml(self):
if self._yaml is None:
sio = six.BytesIO(self.raw())
setattr(sio, 'name', self._name)
self._yaml = list(yaml.load_all(sio,
yaml_loader.YamlLoader))
return self._yaml
def switch_to_next_cluster():
if not os.path.exists(os.path.expanduser("~/.kube/config")):
return
with open(os.path.expanduser("~/.kube/config"), "r") as fd:
docs = yaml.load_all(fd)
for doc in docs:
contexts = doc.get("contexts")
if contexts:
KubeConfig.current_context_index = (KubeConfig.current_context_index+1) % len(contexts)
cluster_name = contexts[KubeConfig.current_context_index]['name']
kubectl_config_use_context = "kubectl config use-context " + cluster_name
cmd_process = subprocess.Popen(kubectl_config_use_context, shell=True, stdout=subprocess.PIPE)
cmd_process.wait()
return
def test_summary_file_entries(self):
"""Verifies the output summary's file format.
This focuses on the format of the file instead of the content of
entries, which is covered in base_test_test.
"""
mock_test_config = self.base_mock_test_config.copy()
mock_ctrlr_config_name = mock_controller.MOBLY_CONTROLLER_CONFIG_NAME
my_config = [{
'serial': 'xxxx',
'magic': 'Magic1'
}, {
'serial': 'xxxx',
'magic': 'Magic2'
}]
mock_test_config.controller_configs[mock_ctrlr_config_name] = my_config
tr = test_runner.TestRunner(self.log_dir, self.test_bed_name)
tr.add_test_class(mock_test_config, integration_test.IntegrationTest)
tr.run()
summary_path = os.path.join(mock_test_config.log_path,
mock_test_config.test_bed_name, 'latest',
records.OUTPUT_FILE_SUMMARY)
with open(summary_path, 'r') as f:
summary_entries = list(yaml.load_all(f))
self.assertEqual(len(summary_entries), 4)
# Verify the first entry is the list of test names.
self.assertEqual(summary_entries[0]['Type'],
records.TestSummaryEntryType.TEST_NAME_LIST.value)
self.assertEqual(summary_entries[1]['Type'],
records.TestSummaryEntryType.RECORD.value)
def test_output(self):
"""Verifies the expected output files from a test run.
* Files are correctly created.
* Basic sanity checks of each output file.
"""
mock_test_config = self.base_mock_test_config.copy()
mock_ctrlr_config_name = mock_controller.MOBLY_CONTROLLER_CONFIG_NAME
my_config = [{
'serial': 'xxxx',
'magic': 'Magic1'
}, {
'serial': 'xxxx',
'magic': 'Magic2'
}]
mock_test_config.controller_configs[mock_ctrlr_config_name] = my_config
tr = test_runner.TestRunner(self.log_dir, self.test_bed_name)
tr.add_test_class(mock_test_config, integration_test.IntegrationTest)
tr.run()
output_dir = os.path.join(self.log_dir, self.test_bed_name, 'latest')
summary_file_path = os.path.join(output_dir,
records.OUTPUT_FILE_SUMMARY)
debug_log_path = os.path.join(output_dir,
records.OUTPUT_FILE_DEBUG_LOG)
info_log_path = os.path.join(output_dir, records.OUTPUT_FILE_INFO_LOG)
self.assertTrue(os.path.isfile(summary_file_path))
self.assertTrue(os.path.isfile(debug_log_path))
self.assertTrue(os.path.isfile(info_log_path))
summary_entries = []
with open(summary_file_path) as f:
for entry in yaml.load_all(f):
self.assertTrue(entry['Type'])
summary_entries.append(entry)
with open(debug_log_path, 'r') as f:
content = f.read()
self.assertIn('DEBUG', content)
self.assertIn('INFO', content)
with open(info_log_path, 'r') as f:
content = f.read()
self.assertIn('INFO', content)
self.assertNotIn('DEBUG', content)