def _clean_up_entries_from_shard(self, object_ids, task_ids, shard_index):
redis = self.state.redis_clients[shard_index]
# Clean up (in the future, save) entries for non-empty objects.
object_ids_locs = set()
object_ids_infos = set()
for object_id in object_ids:
# OL.
obj_loc = redis.zrange(OBJECT_LOCATION_PREFIX + object_id, 0, -1)
if obj_loc:
object_ids_locs.add(object_id)
# OI.
obj_info = redis.hgetall(OBJECT_INFO_PREFIX + object_id)
if obj_info:
object_ids_infos.add(object_id)
# Form the redis keys to delete.
keys = [TASK_TABLE_PREFIX + k for k in task_ids]
keys.extend([OBJECT_LOCATION_PREFIX + k for k in object_ids_locs])
keys.extend([OBJECT_INFO_PREFIX + k for k in object_ids_infos])
if not keys:
return
# Remove with best effort.
num_deleted = redis.delete(*keys)
log.info(
"Removed {} dead redis entries of the driver from redis shard {}.".
format(num_deleted, shard_index))
if num_deleted != len(keys):
log.warning(
"Failed to remove {} relevant redis entries"
" from redis shard {}.".format(len(keys) - num_deleted))
评论列表
文章目录