__init__.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:yq 作者: kislyuk 项目源码 文件源码
def main(args=None):
    args, jq_args = parser.parse_known_args(args=args)
    if sys.stdin.isatty() and not args.files:
        return parser.print_help()

    try:
        # Note: universal_newlines is just a way to induce subprocess to make stdin a text buffer and encode it for us
        jq = subprocess.Popen(["jq"] + jq_args + [args.jq_filter],
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE if args.yaml_output else None,
                              universal_newlines=True)
    except OSError as e:
        msg = "yq: Error starting jq: {}: {}. Is jq installed and available on PATH?"
        parser.exit(msg.format(type(e).__name__, e))

    try:
        input_streams = args.files if args.files else [sys.stdin]
        if args.yaml_output:
            # TODO: enable true streaming in this branch (with asyncio, asyncproc, a multi-shot variant of
            # subprocess.Popen._communicate, etc.)
            # See https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
            input_docs = []
            for input_stream in input_streams:
                input_docs.extend(yaml.load_all(input_stream, Loader=OrderedLoader))
            input_payload = "\n".join(json.dumps(doc, cls=JSONDateTimeEncoder) for doc in input_docs)
            jq_out, jq_err = jq.communicate(input_payload)
            json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict)
            yaml.dump_all(decode_docs(jq_out, json_decoder), stream=sys.stdout, Dumper=OrderedDumper, width=args.width,
                          allow_unicode=True, default_flow_style=False)
        else:
            for input_stream in input_streams:
                for doc in yaml.load_all(input_stream, Loader=OrderedLoader):
                    json.dump(doc, jq.stdin, cls=JSONDateTimeEncoder)
                    jq.stdin.write("\n")
            jq.stdin.close()
            jq.wait()
        for input_stream in input_streams:
            input_stream.close()
        exit(jq.returncode)
    except Exception as e:
        parser.exit("yq: Error running jq: {}: {}.".format(type(e).__name__, e))
评论列表


问题


面经


文章

微信
公众号

扫码关注公众号