def update(self):
# ? ???? ?? drained???? test???.
if not self._prev_drained:
node = self._node
predecessors_status = [v.runner.status for v in node.graph.predecessors(node)]
prev_drained_test = predecessors_status.count(DRAINED) == len(predecessors_status)
if prev_drained_test:
self._prev_drained = True
if not self._sub_drained:
conn1_finished_test = self._conn1_send_count == sum(self._conn1_recv_count.values())
conn2_finished_test = self._conn2_recv_count == sum(self._conn2_send_count.values())
retry_test = self._retry_count >= 3
if conn1_finished_test and conn2_finished_test and retry_test:
self._sub_drained = True
for _ in range(self._nproc):
self._socket1.send(msgpack.dumps(b'END'))
if self._prev_drained and self._sub_drained:
self.status = DRAINED
else:
self.status = READY
评论列表
文章目录