def check_forward(self, x_data, c_data, gamma, T, y_star, y_pam):
num_examples = len(x_data)
x = chainer.Variable(x_data)
c = chainer.Variable(c_data)
loss = clustering_loss(x, c, gamma, T)
sq_distances_ij = []
for i, j in zip(range(num_examples), y_pam):
sqd_ij = np.sum((x_data[i] - x_data[j]) ** 2)
sq_distances_ij.append(sqd_ij)
f = -sum(sq_distances_ij)
sq_distances_ij = []
for i, j in zip(range(num_examples), y_star):
sqd_ij = np.sum((x_data[i] - x_data[j]) ** 2)
sq_distances_ij.append(sqd_ij)
f_tilde = -sum(sq_distances_ij)
delta = 1.0 - normalized_mutual_info_score(cuda.to_cpu(c_data), y_pam)
loss_expected = f + gamma * delta - f_tilde
testing.assert_allclose(loss.data, loss_expected)
python类normalized_mutual_info_score()的实例源码
def test_pipeline_spectral_clustering(seed=36):
# Test using pipeline to do spectral clustering
random_state = np.random.RandomState(seed)
se_rbf = SpectralEmbedding(n_components=n_clusters,
affinity="rbf",
random_state=random_state)
se_knn = SpectralEmbedding(n_components=n_clusters,
affinity="nearest_neighbors",
n_neighbors=5,
random_state=random_state)
for se in [se_rbf, se_knn]:
km = KMeans(n_clusters=n_clusters, random_state=random_state)
km.fit(se.fit_transform(S))
assert_array_almost_equal(
normalized_mutual_info_score(
km.labels_,
true_labels), 1.0, 2)
def evaluate_clustering(y_gt, y_assignment):
return normalized_mutual_info_score(y_gt, y_assignment)
def mi_panel(self, *args, **kwargs):
ranks = self.rank_panel(*args, **kwargs)
panel = []
for rank_vector in ranks:
scores = []
for rank in xrange(1,len(rank_vector)):
# Recall ranks start at 1. The highest rank is uninteresting.
scores.append(normalized_mi(self.dataset.y, rank_vector <= rank))
panel.append(scores)
return np.vstack(panel)
########################################
# Plotting utilities
def calc(gr_truth, predicted):
# precision, recall, fscore, _ = score(gr_truth, predicted, average='micro')
# print('precision: {}'.format(precision))
# print('recall: {}'.format(recall))
# print('fscore: {}'.format(fscore))
# print('jaccard: {}'.format(jaccard_similarity_score(gr_truth, predicted, normalize=True)))
# print('mutual: {}'.format(mutual_info_score(gr_truth, predicted)))
# print('mutual adj: {}'.format(adjusted_mutual_info_score(gr_truth, predicted)))
# print('mutual norm: {}'.format(normalized_mutual_info_score(gr_truth, predicted)))
return normalized_mutual_info_score(gr_truth, predicted)
def compute_cluster_scores(labels, pred_labels, path):
assert len(labels) == len(pred_labels)
rand_score = metrics.adjusted_rand_score(labels, pred_labels)
nmi_score = metrics.normalized_mutual_info_score(labels, pred_labels)
with open(path, 'a') as rr:
rr.write("%4.4f %4.4f\n" % (rand_score, nmi_score))
def measure( predicted,true ):
NMI = normalized_mutual_info_score( true,predicted )
print("NMI:"+str(NMI))
RAND = adjusted_rand_score( true,predicted )
print("RAND:"+str(RAND))
HOMO = homogeneity_score( true,predicted )
print("HOMOGENEITY:"+str(HOMO))
COMPLETENESS = completeness_score( true,predicted )
print("COMPLETENESS:"+str(COMPLETENESS))
return {'NMI':NMI,'RAND':RAND,'HOMOGENEITY':HOMO,'COMPLETENESS':COMPLETENESS}
def test_spectral_embedding_two_components(seed=36):
# Test spectral embedding with two components
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2, n_sample * 2])
# first component
affinity[0:n_sample,
0:n_sample] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# second component
affinity[n_sample::,
n_sample::] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# Test of internal _graph_connected_component before connection
component = _graph_connected_component(affinity, 0)
assert_true(component[:n_sample].all())
assert_true(not component[n_sample:].any())
component = _graph_connected_component(affinity, -1)
assert_true(not component[:n_sample].any())
assert_true(component[n_sample:].all())
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[::2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
se_precomp = SpectralEmbedding(n_components=1, affinity="precomputed",
random_state=np.random.RandomState(seed))
embedded_coordinate = se_precomp.fit_transform(affinity)
# Some numpy versions are touchy with types
embedded_coordinate = \
se_precomp.fit_transform(affinity.astype(np.float32))
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype="float")
assert_equal(normalized_mutual_info_score(true_label, label_), 1.0)
def NMI(y_true,y_pred):
return metrics.normalized_mutual_info_score(y_true, y_pred)
def cluster_nmi(Y_pred, Y):
assert Y_pred.size == Y.size
nmi = normalized_mutual_info_score(Y, Y_pred)
return nmi
def loss_augmented_fit(self, X, y, loss_mult):
"""Fit K-Medoids to the provided data.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Returns
-------
self
"""
self._check_init_args()
# Check that the array is good and attempt to convert it to
# Numpy array if possible
X = self._check_array(X)
# Apply distance metric to get the distance matrix
D = self.distance_func(X)
num_data = X.shape[0]
candidate_ids = range(num_data)
candidate_scores = np.zeros(num_data,)
subset = []
k = 0
while k < self.n_clusters:
candidate_scores = []
for i in candidate_ids:
# push i to subset
subset.append(i)
marginal_cost = np.sum(np.min(D[:, subset], axis=1))
loss = normalized_mutual_info_score(y,self._get_cluster_ics(D, subset))
candidate_scores.append(marginal_cost - loss_mult*loss)
# remove i from subset
subset.pop()
# push i_star to subset
i_star = candidate_ids[np.argmin(candidate_scores)]
bisect.insort(subset, i_star)
# remove i_star from candiate indices
del candidate_ids[bisect.bisect_left(candidate_ids, i_star)]
k = k + 1
#print '|S|: %d, F(S): %f' % (k, np.min(candidate_scores))
# Expose labels_ which are the assignments of
# the training data to clusters
self.labels_ = self._get_cluster_ics(D, subset)
# Expose cluster centers, i.e. medoids
self.cluster_centers_ = X.take(subset, axis=0)
# Expose indices of chosen cluster centers
self.center_ics_ = subset
return self
def assort(self, model):
#if not source:
# data = self.data
# sim_source = self.similarity_matrix('cos')
data = self.data
N = self.data.shape[0]
sim_source = self.similarity_matrix(sim='cos')
y = model.generate(N)
#y = np.triu(y) + np.triu(y, 1).T
sim_learn = model.similarity_matrix(sim='cos')
np.fill_diagonal(indic_source, ma.masked)
assert(N == y.shape[0])
indic_source = ma.array(np.ones(sim_source.shape)*-1, mask=ma.masked)
indic_source[(data == 1) & (sim_source > 0)] = 0
indic_source[(data == 1) & (sim_source <= 0)] = 1
indic_source[(data == 0) & (sim_source > 0)] = 2
indic_source[(data == 0) & (sim_source <= 0)] = 3
indic_learn = ma.array(np.ones(sim_learn.shape)*-1, mask=ma.masked)
indic_learn[(y == 1) & (sim_learn > 0)] = 0
indic_learn[(y == 1) & (sim_learn <= 0)] = 1
indic_learn[(y == 0) & (sim_learn > 0)] = 2
indic_learn[(y == 0) & (sim_learn <= 0)] = 3
np.fill_diagonal(indic_learn, ma.masked)
np.fill_diagonal(indic_source, ma.masked)
indic_source[indic_source == -1] = ma.masked
indic_learn[indic_learn == -1] = ma.masked
### Indicateur Homophily Christine
homo_ind1_source = 1.0 * ( (indic_source==0).sum()+(indic_source==3).sum()-(indic_source==1).sum() - (indic_source==2).sum() ) / (N*(N-1))
homo_ind1_learn = 1.0 * ( (indic_learn== 0).sum()+(indic_learn==3).sum()-(indic_learn==1).sum() - (indic_learn==2).sum() ) / (N*(N-1))
# AMI / NMI
from sklearn import metrics
AMI = metrics.adjusted_mutual_info_score(indic_source.compressed(), indic_learn.compressed())
NMI = metrics.normalized_mutual_info_score(indic_source.compressed(), indic_learn.compressed())
print('homo_ind1 source: %f' % (homo_ind1_source))
print('homo_ind1 learn: %f' % (homo_ind1_learn))
print('AMI: %f, NMI: %f' % (AMI, NMI))
d = {'NMI' : NMI, 'homo_ind1_source' : homo_ind1_source, 'homo_ind1_learn' : homo_ind1_learn}
return d