def merge_list(self, node):
if isinstance(node, yaml.SequenceNode):
result = []
for item in node.value:
obj = self.construct_object(item)
if isinstance(obj, list):
result.extend(obj)
else:
result.append(obj)
return result
else:
raise yaml.constructor.ConstructorError
python类SequenceNode()的实例源码
def multi_constructor(loader, tag_suffix, node):
"""
Deal with !Ref style function format
"""
if tag_suffix not in UNCONVERTED_SUFFIXES:
tag_suffix = "{}{}".format(FN_PREFIX, tag_suffix)
constructor = None
if tag_suffix == "Fn::GetAtt":
constructor = construct_getatt
elif isinstance(node, yaml.ScalarNode):
constructor = loader.construct_scalar
elif isinstance(node, yaml.SequenceNode):
constructor = loader.construct_sequence
elif isinstance(node, yaml.MappingNode):
constructor = loader.construct_mapping
else:
raise "Bad tag: !{}".format(tag_suffix)
return ODict((
(tag_suffix, constructor(node)),
))
def format_node(cls, mapping, metric):
if mapping.tag in [
'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
Number.yaml_tag, StripExtraDash.yaml_tag]:
return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
elif mapping.tag == 'tag:yaml.org,2002:map':
values = []
for key, value in mapping.value:
values.append((yaml.ScalarNode(key.tag, key.value),
cls.format_node(value, metric)))
return yaml.MappingNode(mapping.tag, values)
elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
values = []
for seq in mapping.value:
map_values = list()
for key, value in seq.value:
if key.value == 'SELECT':
map_values.append((yaml.ScalarNode(key.tag, key.value),
cls.format_node(value, metric)))
else:
map_values.append((yaml.ScalarNode(key.tag, key.value),
value))
values.append(yaml.MappingNode(seq.tag, map_values))
return yaml.SequenceNode(mapping.tag, values)
elif mapping.tag in [MapValue.yaml_tag]:
values = []
for key, value in mapping.value:
if key.value == 'VALUE':
values.append((yaml.ScalarNode(key.tag, key.value),
cls.format_node(value, metric)))
else:
values.append((yaml.ScalarNode(key.tag, key.value), value))
return yaml.MappingNode(mapping.tag, values)
return mapping
def generic_object(loader, suffix, node):
if isinstance(node, yaml.ScalarNode):
constructor = loader.__class__.construct_scalar
elif isinstance(node, yaml.SequenceNode):
constructor = loader.__class__.construct_sequence
elif isinstance(node, yaml.MappingNode):
constructor = loader.__class__.construct_mapping
else:
raise ValueError(node)
# TODO(tailhook) wrap into some object?
return constructor(loader, node)
def include(self, node):
if isinstance(node, yaml.ScalarNode):
return self.extractFile(self.construct_scalar(node))
elif isinstance(node, yaml.SequenceNode):
result = []
for filename in self.construct_sequence(node):
result += self.extractFile(filename)
return result
else:
raise yaml.constructor.ConstructorError
def from_yaml(cls, loader, node):
key = node.tag[1:]
if node.tag not in ('!Ref', '!Condition'):
key = 'Fn::' + key
if isinstance(node, ScalarNode):
val = loader.construct_scalar(node)
elif isinstance(node, SequenceNode):
val = loader.construct_sequence(node)
elif isinstance(node, MappingNode):
val = loader.construct_mapping(node)
else:
raise Exception("Unable to handle node: %r"%node)
return {str(key): str(val)}
def construct_odict(self, node):
omap = OrderedDict()
yield omap
if not isinstance(node, yaml.SequenceNode):
raise yaml.constructor.ConstructorError(
"while constructing an ordered map",
node.start_mark,
"expected a sequence, but found %s" % node.id, node.start_mark
)
for subnode in node.value:
if not isinstance(subnode, yaml.MappingNode):
raise yaml.constructor.ConstructorError(
"while constructing an ordered map", node.start_mark,
"expected a mapping of length 1, but found %s" % subnode.id,
subnode.start_mark
)
if len(subnode.value) != 1:
raise yaml.constructor.ConstructorError(
"while constructing an ordered map", node.start_mark,
"expected a single mapping item, but found %d items" % len(subnode.value),
subnode.start_mark
)
key_node, value_node = subnode.value[0]
key = self.construct_object(key_node)
value = self.construct_object(value_node)
omap[key] = value
def from_yaml(cls, loader, node):
logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
loader, node))
# e.g.:
# SequenceNode(tag=u'!ArrayItem', value=[
# MappingNode(tag=u'tag:yaml.org,2002:map', value=[
# (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
# MappingNode(tag=u'tag:yaml.org,2002:map', value=[
# (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
# , ...)
# ]), ...
# ), (key, value), ... ])
# , ... ])
assert isinstance(node, yaml.SequenceNode), \
"{} tag isn't YAML array".format(cls.__name__)
select, index_keys, items, item_desc = list(), list(), list(), None
for elem in node.value:
for key, value in elem.value:
if key.value == 'ITEM-DESC':
assert item_desc is None, "ITEM-DESC key already set"
item_desc = value
if key.value == 'INDEX-KEY':
assert len(index_keys) == 0, "INDEX-KEY key already set"
index_keys = loader.construct_sequence(value)
if key.value == 'SELECT':
select.append(loader.construct_mapping(value))
# validate item description
assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
assert len(select) > 0 or len(index_keys) > 0, \
"Mandatory key (INDEX-KEY or SELECT) isn't set"
metrics = loader.collector.items(select)
# select metrics based on INDEX-KEY provided
if len(index_keys) > 0:
metric_set = set()
for metric in metrics:
value = CollectdValue()
for key in index_keys:
setattr(value, key, getattr(metric, key))
metric_set.add(value)
metrics = list(metric_set)
# build items based on SELECT and/or INDEX-KEY criteria
for metric in metrics:
item = cls.format_node(item_desc,
{'vl': metric, 'system': loader.system,
'config': loader.config})
items.append(loader.construct_mapping(item))
return items