def utilization_queue(self, free_capacity, visitor=None):
"""Returns utilization queue including the sub-allocs.
All app queues from self and sub-allocs are merged in standard order,
and then utilization is recalculated based on total reserved capacity
of this alloc and sub-allocs combined.
The function maintains invariant that any app (self or inside sub-alloc
with utilization < 1 will remain with utilzation < 1.
"""
total_reserved = self.total_reserved()
queues = [alloc.utilization_queue(free_capacity, visitor)
for alloc in self.sub_allocations.values()]
queues.append(self.priv_utilization_queue())
acc_demand = zero_capacity()
available = total_reserved + free_capacity + np.finfo(float).eps
# FIXME: heapq.merge has an overhead of comparison
for item in heapq.merge(*queues):
rank, _util, pending, order, app = item
acc_demand = acc_demand + app.demand
util = utilization(acc_demand, total_reserved, available)
if app.priority == 0:
util = _MAX_UTILIZATION
# - lower rank allocations take precedence.
# - for same rank, utilization takes precedence
# - False < True, so for apps with same utilization we prefer
# those that already running (False == not pending)
# - Global order
entry = (rank, util, pending, order, app)
if visitor:
visitor(self, entry)
yield entry
评论列表
文章目录