def plackett_luce(rankings, tolerance=1e-9, check_assumption=True, normalize=True, verbose=False):
'''This algorithm returns the MLE of the Plackett-Luce ranking parameters
over a given set of rankings. It requires that the set of players is unable
to be split into two disjoint sets where nobody from set A has beaten anyone from
set B. If this assumption fails, the algorithm will diverge. If the
assumption is checked and fails, the algorithm will short-circuit and
return None.
Input is a list of dictionaries, where each dictionary corresponds to an
individual ranking and contains the player : finish for that ranking.
Output is a dictionary containing player : plackett_luce_parameter keys
and values.
'''
players = set(key for ranking in rankings for key in ranking.keys())
rankings = [sorted(ranking.keys(),key=ranking.get) for ranking in rankings]
if verbose:
print('Using native Python implementation of Plackett-Luce.')
print('{:,} unique players found.'.format(len(players)))
print('{:,} rankings found.'.format(len(rankings)))
if check_assumption:
edges = [(source, dest) for ranking in rankings for source, dest in combinations(ranking, 2)]
scc_count = len(set(scc(edges).values()))
if verbose:
if scc_count == 1:
print ('No disjoint sets found. Algorithm convergence conditions are met.')
else:
print('{:,} disjoint sets found. Algorithm will diverge.'.format(scc_count))
if scc_count != 1:
return None
ws = Counter(name for ranking in rankings for name in ranking[:-1])
gammas = {player : 1.0 / len(players) for player in players}
gdiff = float('inf')
iteration = 0
start = time.perf_counter()
while gdiff > tolerance:
_gammas = gammas
gamma_sums = [list(accumulate(1 / s for s in reversed(list(accumulate(gammas[finisher] for finisher in reversed(ranking)))))) for ranking in rankings]
gammas = {player : ws[player] / sum(gamma_sum[min(ranking.index(player), len(ranking) - 2)]
for ranking, gamma_sum in zip(rankings, gamma_sums) if player in ranking)
for player in players}
if normalize:
gammas = {player : gamma / sum(gammas.values()) for player, gamma in gammas.items()}
pgdiff = gdiff
gdiff = sqrt(sum((gamma - _gammas[player]) ** 2 for player, gamma in gammas.items()))
iteration += 1
if verbose:
now = time.perf_counter()
print("%d %.2f seconds L2=%.2e" % (iteration, now-start, gdiff))
if gdiff > pgdiff:
print("Gamma difference increased, %.4e %.4e" % (gdiff, pgdiff))
start = now
return gammas
评论列表
文章目录