def _observe_mdns(reader, output: io.TextIOBase, verbose: bool):
"""Process the given `reader` for `avahi-browse` events.
IO is mostly isolated in this function; the transformation functions
`_observe_all_in_full` and `_observe_resolver_found` can be tested without
having to deal with IO.
:param reader: A context-manager yielding a `io.TextIOBase`.
"""
if verbose:
observer = _observe_all_in_full
else:
observer = _observe_resolver_found
with reader as infile:
events = _extract_mdns_events(infile)
for event in observer(events):
print(json.dumps(event), file=output, flush=True)
评论列表
文章目录