def main_loop(cfg: Config,
logger: Logger,
transport_cls: Generic[T],
continue_fn: callable,
loop: BaseEventLoop):
riemann = cfg.riemann
transport = transport_cls(riemann.host, riemann.port)
client = processor.QClient(transport)
agents = create_agents(cfg.agents)
register_augments(client, cfg.augments, logger)
executor = cfg.executor_class(max_workers=cfg.executors_count)
loop.set_default_executor(executor)
init(agents)
while True:
ts = time()
(done, pending) = await step(client,
agents,
timeout=cfg.interval * 1.5,
loop=loop)
te = time()
td = te - ts
instrumentation(client,
logger,
cfg.interval,
td,
len(client.queue.events),
len(pending))
await processor.flush(client, transport, logger)
if continue_fn():
await asyncio.sleep(cfg.interval - int(td), loop=loop)
else:
logger.info("Stopping Oshino")
break
client.on_stop()
评论列表
文章目录