def parallel_apply_method(method, nodes, sample_rate=1, duration=1, leaves_only=False):
"""
Apply wrapped-method "method" to every node in "nodes", "sample_rate" times per second, for "duration" seconds.
Returns a list of results for each time slice. Each time slice result is a wrapped-method result tuple
(node, return value, exception)
"""
if leaves_only:
nodes = [x for x in nodes if x.get_property('#units') != "PathNode"]
if not nodes:
return {}
with ThreadPool(len(nodes)) as pool:
time_slice_results = queue.Queue()
def apply_time_slice():
time_slice_results.put(pool.map_async(method, nodes))
num_slices = int(duration * sample_rate)
slice_times = [slice_number / sample_rate for slice_number in range(num_slices)]
time_slice_threads = [threading.Timer(time, apply_time_slice) for time in slice_times]
complete_all_threads(time_slice_threads)
return consume_queue(time_slice_results)
评论列表
文章目录