def save_current_file():
print("Saving current file: {}".format(current_file))
def order_dict(d):
return OrderedDict([
("title", d["title"]),
("type", d["type"]),
("has_source", d["has_source"]),
("info", OrderedDict([
(key, d["info"][key] if key in d["info"] else "") for key in info_keys
])),
("streams", OrderedDict([
(key, d["streams"][key] if key in d["streams"] else "") for key in stream_keys
]))
])
try:
sorted_docs = [order_dict(doc) for doc in current_docs]
with open(current_file, "w", encoding="UTF-8") as f:
yaml.dump_all(sorted_docs, f, default_flow_style=False, indent=4, allow_unicode=True)
except:
from traceback import print_exc
print_exc()
return False
return True
python类dump_all()的实例源码
def dump( data, dst=unicode, safe=False,
force_embed=False, vspacing=None, string_val_style=None, **pyyaml_kws ):
buff = io.BytesIO()
Dumper = PrettyYAMLDumper if safe else UnsafePrettyYAMLDumper
Dumper = ft.partial(Dumper, force_embed=force_embed, string_val_style=string_val_style)
yaml.dump_all( [data], buff, Dumper=Dumper,
default_flow_style=False, allow_unicode=True, encoding='utf-8', **pyyaml_kws )
if vspacing is not None:
dump_add_vspacing(buff, vspacing)
buff = buff.getvalue()
if dst is bytes: return buff
elif dst is unicode: return buff.decode('utf-8')
else:
try: dst.write(b'') # tests if dst is unicode- or bytestream
except: dst.write(buff.decode('utf-8'))
else: dst.write(buff)
def test_represent_scapy(self):
data = IP() / UDP()
yaml.dump_all([data], Dumper=OSafeDumper)
def generate_config(args):
setup_yaml()
if args.existing:
existing = next(yaml.safe_load_all(open(args.existing, 'r', encoding='utf-8')))
else:
existing = None
cfg, failed = chores.generate_chore_config(args.db, args.bookmark, existing)
with open(args.output, 'w', encoding='utf-8') as f:
yaml.dump_all((cfg, failed), f, default_flow_style=False)
logging.info('Done.')
def save(self, ostream):
"""
Save model to the stream.
"""
allLayer = []
for layer in self.nextLayer():
allLayer.append(layer)
yaml.dump_all(allLayer, ostream)
def write_content(self, content=None):
if content:
self.content = content
self.__documents[self.__document_id] = self.content
def representer(dumper, data):
"""Represents a dict key started with '!' as a YAML tag
Assumes that there is only one !tag in the dict at the
current indent.
Python object:
{"!unknown_tag": ["some content", ]}
Resulting yaml:
!unknown_tag
- some content
"""
key = data.keys()[0]
if key.startswith("!"):
value = data[key]
if type(value) is dict:
node = dumper.represent_mapping(key, value)
elif type(value) is list:
node = dumper.represent_sequence(key, value)
else:
node = dumper.represent_scalar(key, value)
else:
node = dumper.represent_mapping(u'tag:yaml.org,2002:map', data)
return node
yaml.add_representer(dict, representer)
with self.__get_file("w") as file_obj:
yaml.dump_all(self.__documents, file_obj,
default_flow_style=self.default_flow_style,
default_style=self.default_style)
def create_season_config(config, db, output_file):
info("Checking for new shows")
shows = _get_primary_source_shows(config)
debug("Outputting new shows")
with open(output_file, "w", encoding="utf-8") as f:
yaml.dump_all(shows, f, explicit_start=True, default_flow_style=False)
def _write(output_dir, docs):
with open(os.path.join(output_dir, 'certificates.yaml'), 'w') as f:
# Don't use safe_dump_all so we can block format certificate data.
yaml.dump_all(
docs,
stream=f,
default_flow_style=False,
explicit_start=True,
indent=2)
def write_yamls(data: List[Dict], file: str = None):
return __write_yaml(data, yaml.dump_all, file)
def write_content(self, content=None):
if content:
self.content = content
self.__documents[self.__document_id] = self.content
def representer(dumper, data):
"""Represents a dict key started with '!' as a YAML tag
Assumes that there is only one !tag in the dict at the
current indent.
Python object:
{"!unknown_tag": ["some content", ]}
Resulting yaml:
!unknown_tag
- some content
"""
key = data.keys()[0]
if key.startswith("!"):
value = data[key]
if type(value) is dict:
node = dumper.represent_mapping(key, value)
elif type(value) is list:
node = dumper.represent_sequence(key, value)
else:
node = dumper.represent_scalar(key, value)
else:
node = dumper.represent_mapping(u'tag:yaml.org,2002:map', data)
return node
yaml.add_representer(dict, representer)
with self.__get_file("w") as file_obj:
yaml.dump_all(self.__documents, file_obj,
default_flow_style=self.default_flow_style,
default_style=self.default_style)
def save_info(path, info, default_flow_style=False):
with open(path, 'w') as file:
_yaml.dump_all(info, file, default_flow_style=default_flow_style)
def dump(data, stream=None, safe=False, many=False, **kwargs):
kwargs.setdefault('default_flow_style', False)
Dumper = SafeCustomDumper if safe else CustomDumper
if not many:
data = [data]
return yaml.dump_all(data, stream, Dumper, **kwargs)
def safe_dump(data, **kwargs):
kwargs["default_flow_style"] = False
return yaml.dump_all(
[data], None, Dumper=PrettyPrinterDumper, **kwargs)
def safe_dump(data, stream=None, **kw):
"""
Safely dump to a yaml file the specified data.
"""
return yaml.dump_all([data], stream, Dumper=OSafeDumper, **kw)
def main(args=None):
args, jq_args = parser.parse_known_args(args=args)
if sys.stdin.isatty() and not args.files:
return parser.print_help()
try:
# Note: universal_newlines is just a way to induce subprocess to make stdin a text buffer and encode it for us
jq = subprocess.Popen(["jq"] + jq_args + [args.jq_filter],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE if args.yaml_output else None,
universal_newlines=True)
except OSError as e:
msg = "yq: Error starting jq: {}: {}. Is jq installed and available on PATH?"
parser.exit(msg.format(type(e).__name__, e))
try:
input_streams = args.files if args.files else [sys.stdin]
if args.yaml_output:
# TODO: enable true streaming in this branch (with asyncio, asyncproc, a multi-shot variant of
# subprocess.Popen._communicate, etc.)
# See https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
input_docs = []
for input_stream in input_streams:
input_docs.extend(yaml.load_all(input_stream, Loader=OrderedLoader))
input_payload = "\n".join(json.dumps(doc, cls=JSONDateTimeEncoder) for doc in input_docs)
jq_out, jq_err = jq.communicate(input_payload)
json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict)
yaml.dump_all(decode_docs(jq_out, json_decoder), stream=sys.stdout, Dumper=OrderedDumper, width=args.width,
allow_unicode=True, default_flow_style=False)
else:
for input_stream in input_streams:
for doc in yaml.load_all(input_stream, Loader=OrderedLoader):
json.dump(doc, jq.stdin, cls=JSONDateTimeEncoder)
jq.stdin.write("\n")
jq.stdin.close()
jq.wait()
for input_stream in input_streams:
input_stream.close()
exit(jq.returncode)
except Exception as e:
parser.exit("yq: Error running jq: {}: {}.".format(type(e).__name__, e))