def pull_stats(config, storage_dir, loop, executor):
"""
Launch coroutines for pulling statistics from UNIX sockets.
This a delegating routine.
Arguments:
config (obj): A configParser object which holds configuration.
storage_dir (str): The absolute directory path to save the statistics.
loop (obj): A base event loop.
executor(obj): A ThreadPoolExecutor object.
Returns:
True if statistics from *all* UNIX sockets are fetched False otherwise.
"""
# absolute directory path which contains UNIX socket files.
results = [] # stores the result of finished tasks
socket_dir = config.get('pull', 'socket-dir')
pull_timeout = config.getfloat('pull', 'pull-timeout')
if int(pull_timeout) == 0:
pull_timeout = None
socket_files = [f for f in glob.glob(socket_dir + '/*')
if is_unix_socket(f)]
if not socket_files:
log.error("found zero UNIX sockets under %s to connect to", socket_dir)
return False
log.debug('pull statistics')
coroutines = [get(socket_file, cmd, storage_dir, loop, executor, config)
for socket_file in socket_files
for cmd in CMDS]
# Launch all connections.
done, pending = yield from asyncio.wait(coroutines,
timeout=pull_timeout,
return_when=ALL_COMPLETED)
for task in done:
log.debug('task status: %s', task)
results.append(task.result())
log.debug('task report, done:%s pending:%s succeed:%s failed:%s',
len(done),
len(pending),
results.count(True),
results.count(False))
for task in pending:
log.warning('cancelling task %s as it reached its timeout threshold of'
' %.2f seconds', task, pull_timeout)
task.cancel()
# only when all tasks are finished successfully we claim success
return not pending and len(set(results)) == 1 and True in set(results)
评论列表
文章目录