def initialize_generation(db_path: str, scenario_id: int, generation: int, genotypes: List[Genome],
pair_selection_f: PAIR_SELECTION_F_T, fitness_f: FITNESS_F_T, neat_config: CPPNNEATConfig,
ca_config: CAConfig, innovation_archive: List[Genome]) -> None:
from celery import group, chord
serialized = {}
distances = {}
k = 15
concurrent_tasks = []
for i, gt in enumerate(genotypes):
l = []
for other_gt in genotypes + innovation_archive:
if other_gt is gt:
continue
key = tuple(sorted((gt, other_gt), key=id))
if key in distances:
pass
else:
if gt not in serialized:
pt = create_feed_forward_phenotype(gt)
_, serialized[gt] = serialize_cppn_rule(cppn=pt, ca_config=ca_config)
if other_gt not in serialized:
pt = create_feed_forward_phenotype(other_gt)
_, serialized[other_gt] = serialize_cppn_rule(cppn=pt, ca_config=ca_config)
a = serialized[gt]
b = serialized[other_gt]
distances[key] = hamming(a, b, normalized=True)
l.append(distances[key])
gt.fitness = sum(sorted(l)[:k]) / k
concurrent_tasks.append(handle_individual.s(
scenario_id=scenario_id,
generation=generation,
individual_number=i,
genotype=gt,
fitness_f=fitness_f,
ca_config=ca_config,
))
final_task = persist_results.subtask(
args=(db_path, scenario_id, generation, fitness_f, pair_selection_f, neat_config, ca_config)
)
chord(group(concurrent_tasks), final_task)()
评论列表
文章目录