def test_GMM_n_components(*data):
'''
test the performance with different N_components
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
ARIs=[]
for num in nums:
clst=mixture.GaussianMixture(n_components=num)
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs.append(adjusted_rand_score(labels_true,predicted_labels))
## graph
fig=plt.figure()
ax=fig.add_subplot(1,1,1)
ax.plot(nums,ARIs,marker="+")
ax.set_xlabel("n_components")
ax.set_ylabel("ARI")
fig.suptitle("GMM")
plt.show()
python类adjusted_rand_score()的实例源码
def test_AgglomerativeClustering_nclusters(*data):
'''
test the performance with different n_clusters
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
ARIs=[]
for num in nums:
clst=cluster.AgglomerativeClustering(n_clusters=num)
predicted_labels=clst.fit_predict(X)
ARIs.append(adjusted_rand_score(labels_true,predicted_labels))
## graph
fig=plt.figure()
ax=fig.add_subplot(1,1,1)
ax.plot(nums,ARIs,marker="+")
ax.set_xlabel("n_clusters")
ax.set_ylabel("ARI")
fig.suptitle("AgglomerativeClustering")
plt.show()
def test_discretize(seed=8):
# Test the discretize using a noise assignment matrix
random_state = np.random.RandomState(seed)
for n_samples in [50, 100, 150, 500]:
for n_class in range(2, 10):
# random class labels
y_true = random_state.random_integers(0, n_class, n_samples)
y_true = np.array(y_true, np.float)
# noise class assignment matrix
y_indicator = sparse.coo_matrix((np.ones(n_samples),
(np.arange(n_samples),
y_true)),
shape=(n_samples,
n_class + 1))
y_true_noisy = (y_indicator.toarray()
+ 0.1 * random_state.randn(n_samples,
n_class + 1))
y_pred = discretize(y_true_noisy, random_state)
assert_greater(adjusted_rand_score(y_true, y_pred), 0.8)
def bench_k_means(estimator, name, data):
t0 = time()
estimator.fit(data)
print('% 9s %.2fs %i %.3f %.3f %.3f %.3f %.3f %.3f'
% (name, (time() - t0), estimator.inertia_,
metrics.homogeneity_score(labels, estimator.labels_),
metrics.completeness_score(labels, estimator.labels_),
metrics.v_measure_score(labels, estimator.labels_),
metrics.adjusted_rand_score(labels, estimator.labels_),
metrics.adjusted_mutual_info_score(labels, estimator.labels_),
metrics.silhouette_score(data, estimator.labels_,
metric='euclidean',
sample_size=sample_size)))
def bench_k_means(labels, labels_, name, data):
print('%20s %.3f %.3f %.3f %.3f %.3f'
% ( name,
metrics.homogeneity_score(labels, labels_),
metrics.completeness_score(labels, labels_),
metrics.v_measure_score(labels, labels_),
metrics.adjusted_rand_score(labels, labels_),
metrics.adjusted_mutual_info_score(labels, labels_)))
nbins=len(set(labels_))
vals,bins=np.histogram(labels_,bins=nbins)
print 20*' ','hist-min,max',np.min(vals),np.max(vals)
def computeAdjustedEvaluations(self, labels_families, predicted_clusters):
if labels_families is None:
self.adjusted_rand_score = 0
self.adjusted_mutual_info_score = 0
return
self.adjusted_rand_score = metrics.adjusted_rand_score(labels_families, predicted_clusters)
self.adjusted_mutual_info_score = metrics.adjusted_mutual_info_score(labels_families, predicted_clusters)
def toJson(self):
obj = {}
obj['homogeneity'] = self.homogeneity
obj['completeness'] = self.completeness
obj['v_measure'] = self.v_measure
obj['adjusted_rand_score'] = self.adjusted_rand_score
obj['adjusted_mutual_info_score'] = self.adjusted_mutual_info_score
return obj
def analyze_k_means(estimator, name, data):
t0 = time()
estimator.fit(data)
print(" %9s %.2fs %i %.3f %.3f %.3f %.3f %.3f %.3f"%( name, time()-t0, estimator.inertia_, metrics.homogeneity_score(labels, estimator.labels_), metrics.completeness_score(labels, estimator.labels_), metrics.v_measure_score(labels, estimator.labels_), metrics.adjusted_rand_score(labels, estimator.labels_), metrics.adjusted_mutual_info_score(labels, estimator.labels_), metrics.silhouette_score(data, estimator.labels_, metric='euclidean', sample_size = samples) ))
def column_average_ari(Zv, Zc, cc_state_object):
from sklearn.metrics import adjusted_rand_score
ari = 0
n_cols = len(Zv)
for col in xrange(n_cols):
view_t = Zv[col]
Zc_true = Zc[view_t]
view_i = cc_state_object.Zv[col]
Zc_inferred = cc_state_object.views[view_i].Z.tolist()
ari += adjusted_rand_score(Zc_true, Zc_inferred)
return ari/float(n_cols)
def compute_cluster_scores(labels, pred_labels, path):
assert len(labels) == len(pred_labels)
rand_score = metrics.adjusted_rand_score(labels, pred_labels)
nmi_score = metrics.normalized_mutual_info_score(labels, pred_labels)
with open(path, 'a') as rr:
rr.write("%4.4f %4.4f\n" % (rand_score, nmi_score))
def ARI(labels_true, labels_pred):
return adjusted_rand_score(labels_true, labels_pred)
def measure( predicted,true ):
NMI = normalized_mutual_info_score( true,predicted )
print("NMI:"+str(NMI))
RAND = adjusted_rand_score( true,predicted )
print("RAND:"+str(RAND))
HOMO = homogeneity_score( true,predicted )
print("HOMOGENEITY:"+str(HOMO))
COMPLETENESS = completeness_score( true,predicted )
print("COMPLETENESS:"+str(COMPLETENESS))
return {'NMI':NMI,'RAND':RAND,'HOMOGENEITY':HOMO,'COMPLETENESS':COMPLETENESS}
def performance(self, group_labels=None):
"""
Computes performance metrics for clustering algorithm
Parameters
----------
group_labels : (optional) ndarray(shape=nsubjects)
Labels for subject groups
"""
n_samples = len(self.algorithm.labels_)
if group_labels is None:
truelab = np.zeros(n_samples)
unique_labels = np.unique(group_labels)
self.clusters["true_int"] = truelab
else:
truelab = np.zeros(n_samples)
unique_labels = np.unique(group_labels)
for i, label_i in enumerate(unique_labels):
truelab[group_labels == label_i] = i
self.clusters["true"] = group_labels
self.clusters["true_int"] = truelab
lab = self.algorithm.labels_
self.results["homogeneity"] = homogeneity_score(truelab, lab)
self.results["completeness"] = completeness_score(truelab, lab)
self.results["v_measure"] = v_measure_score(truelab, lab)
self.results["adj_rand"] = adjusted_rand_score(truelab, lab)
self.results["adj_MI"] = adjusted_mutual_info_score(truelab, lab)
def test_GMM(*data):
'''
test the method of GMM
:param data: data , target
:return: None
'''
X,labels_true=data
clst=mixture.GaussianMixture()
clst.fit(X)
predicted_labels=clst.predict(X)
print("ARI:{0}".format(adjusted_rand_score(labels_true,predicted_labels)))
def test_GMM_cov_type(*data):
'''
test the performance with different cov_type
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
cov_types=['spherical','tied','diag','full']
markers="+o*s"
fig=plt.figure()
ax=fig.add_subplot(1,1,1)
for i ,cov_type in enumerate(cov_types):
ARIs=[]
for num in nums:
clst=mixture.GaussianMixture(n_components=num,covariance_type=cov_type)
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs.append(adjusted_rand_score(labels_true,predicted_labels))
ax.plot(nums,ARIs,marker=markers[i],label="covariance_type:{0}".format(cov_type))
ax.set_xlabel("n_components")
ax.legend(loc="best")
ax.set_ylabel("ARI")
fig.suptitle("GMM")
plt.show()
def test_DBSCAN(*data):
'''
test the DBSCAN method
:param data: train, target
:return: None
'''
X,labels_true=data
clst=cluster.DBSCAN()
predicted_labels=clst.fit_predict(X)
print("ARI:%s"% adjusted_rand_score(labels_true,predicted_labels))
print("Core sample num:{0}".format(len(clst.core_sample_indices_)))
def test_DBSCAN_epsilon(*data):
'''
test the score with different eps
:param data: train, target
:return: None
'''
X,labels_true=data
epsilons=np.logspace(-1,1.5)
ARIs=[]
Core_nums=[]
for epsilon in epsilons:
clst=cluster.DBSCAN(eps=epsilon)
predicted_labels=clst.fit_predict(X)
ARIs.append( adjusted_rand_score(labels_true,predicted_labels))
Core_nums.append(len(clst.core_sample_indices_))
## graph
fig=plt.figure()
ax=fig.add_subplot(1,2,1)
ax.plot(epsilons,ARIs,marker='+')
ax.set_xscale('log')
ax.set_xlabel(r"$\epsilon$")
ax.set_ylim(0,1)
ax.set_ylabel('ARI')
ax=fig.add_subplot(1,2,2)
ax.plot(epsilons,Core_nums,marker='o')
ax.set_xscale('log')
ax.set_xlabel(r"$\epsilon$")
ax.set_ylabel('Core_Nums')
fig.suptitle("DBSCAN")
plt.show()
def test_Kmeans(*data):
'''
test the Kmeans
:param data: data, target
:return: None
'''
X,labels_true=data
clst=cluster.KMeans()
clst.fit(X)
predicted_labels=clst.predict(X)
print("ARI:{0}".format( adjusted_rand_score(labels_true,predicted_labels)))
print("Sum center distance {0}".format(clst.inertia_))
def test_Kmeans_nclusters(*data):
'''
test the performance with different n_clusters
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
ARIs=[]
Distances=[]
for num in nums:
clst=cluster.KMeans(n_clusters=num)
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs.append(adjusted_rand_score(labels_true,predicted_labels))
Distances.append(clst.inertia_)
## graph
fig=plt.figure()
ax=fig.add_subplot(1,2,1)
ax.plot(nums,ARIs,marker="+")
ax.set_xlabel("n_clusters")
ax.set_ylabel("ARI")
ax=fig.add_subplot(1,2,2)
ax.plot(nums,Distances,marker='o')
ax.set_xlabel("n_clusters")
ax.set_ylabel("inertia_")
fig.suptitle("KMeans")
plt.show()
def test_AgglomerativeClustering(*data):
'''
test AGG method
:param data: data, target
:return: None
'''
X,labels_true=data
clst=cluster.AgglomerativeClustering()
predicted_labels=clst.fit_predict(X)
print("ARI:{0}".format(adjusted_rand_score(labels_true,predicted_labels)))
def evaluate(path):
system = systems[path]
measure, scores, clusters_gold, clusters_system = 0., OrderedDict(), [], []
for lemma in lemmas:
instances = sorted(gold[lemma].keys())
senses_gold = {sid: i for i, sid in enumerate(sorted(set(gold[lemma].values())))}
senses_system = {sid: i for i, sid in enumerate(sorted(set(system[lemma].values())))}
clusters_gold = [senses_gold[gold[lemma][instance]] for instance in instances]
clusters_system = [senses_system[system[lemma][instance]] for instance in instances]
if 'vmeasure' == args.measure:
if 'instances' == args.average:
measure += v_measure_score(clusters_gold, clusters_system) * len(instances) / total
else:
measure += v_measure_score(clusters_gold, clusters_system)
scores[lemma] = (
homogeneity_score(clusters_gold, clusters_system),
completeness_score(clusters_gold, clusters_system),
v_measure_score(clusters_gold, clusters_system)
)
else:
scores[lemma] = adjusted_rand_score(clusters_gold, clusters_system)
if 'instances' == args.average:
measure += scores[lemma] * len(instances) / total
else:
measure += scores[lemma]
if 'words' == args.average:
measure /= len(lemmas)
return measure, scores
def bench_k_means(estimator, name, data):
t0 = time()
estimator.fit(data)
print('% 9s %.2fs %i %.3f %.3f %.3f %.3f %.3f %.3f'
% (name, (time() - t0), estimator.inertia_,
metrics.homogeneity_score(labels, estimator.labels_),
metrics.completeness_score(labels, estimator.labels_),
metrics.v_measure_score(labels, estimator.labels_),
metrics.adjusted_rand_score(labels, estimator.labels_),
metrics.adjusted_mutual_info_score(labels, estimator.labels_),
metrics.silhouette_score(data, estimator.labels_,
metric='euclidean',
sample_size=sample_size)))
def check_clustering(name, Alg):
X, y = make_blobs(n_samples=50, random_state=1)
X, y = shuffle(X, y, random_state=7)
X = StandardScaler().fit_transform(X)
n_samples, n_features = X.shape
# catch deprecation and neighbors warnings
with warnings.catch_warnings(record=True):
alg = Alg()
set_testing_parameters(alg)
if hasattr(alg, "n_clusters"):
alg.set_params(n_clusters=3)
set_random_state(alg)
if name == 'AffinityPropagation':
alg.set_params(preference=-100)
alg.set_params(max_iter=100)
# fit
alg.fit(X)
# with lists
alg.fit(X.tolist())
assert_equal(alg.labels_.shape, (n_samples,))
pred = alg.labels_
assert_greater(adjusted_rand_score(pred, y), 0.4)
# fit another time with ``fit_predict`` and compare results
if name is 'SpectralClustering':
# there is no way to make Spectral clustering deterministic :(
return
set_random_state(alg)
with warnings.catch_warnings(record=True):
pred2 = alg.fit_predict(X)
assert_array_equal(pred, pred2)
def test_spectral_clustering_sparse():
X, y = make_blobs(n_samples=20, random_state=0,
centers=[[1, 1], [-1, -1]], cluster_std=0.01)
S = rbf_kernel(X, gamma=1)
S = np.maximum(S - 1e-4, 0)
S = sparse.coo_matrix(S)
labels = SpectralClustering(random_state=0, n_clusters=2,
affinity='precomputed').fit(S).labels_
assert_equal(adjusted_rand_score(y, labels), 1)
def ARI(y_true,y_pred):
return metrics.adjusted_rand_score(y_true, y_pred)
def compute_affinity_propagation(preference_, X):
# DATA FILLING
#text = io.Input.local_read_text_file(inputFilePath)
#input_array = text.split('\n')
centers = [[1, 1], [-1, -1], [1, -1]]
n_samples = 300
#Make Blobs used for generating of labels_true array
if (X == None):
X, labels_true = make_blobs(n_samples = n_samples, centers=centers, cluster_std=1, random_state=0)
print("Data is none!!!")
print("Generating " + str(n_samples) + " samples")
else :
data, labels_true = make_blobs(n_samples=len(X), centers=centers, cluster_std=1, random_state=0)
#slist = list()
#for line in X:
# slist.append(line)
#io.Output.write_array_to_txt_file("clustering\\Affinity_Propagation\\input_data1.txt", slist)
#float_array = []
#for line in input_array:
# float_line = [float(i) for i in line.split(' ')]
# float_array.append(float_line)
#X = array(float_array)
af = AffinityPropagation(preference=preference_).fit(X)
cluster_centers_indices = af.cluster_centers_indices_
labels = af.labels_
n_clusters_ = len(cluster_centers_indices)
print('Estimated number of clusters: %d' % n_clusters_)
print("Homogeneity: %0.3f" % metrics.homogeneity_score(labels_true, labels))
print("Completeness: %0.3f" % metrics.completeness_score(labels_true, labels))
print("V-measure: %0.3f" % metrics.v_measure_score(labels_true, labels))
print("Adjusted Rand Index: %0.3f" % metrics.adjusted_rand_score(labels_true, labels))
print("Adjusted Mutual Information: %0.3f" % metrics.adjusted_mutual_info_score(labels_true, labels))
# print("Silhouette Coefficient: %0.3f" % metrics.silhouette_score(X, labels, metric='sqeuclidean'))
print("Fowlkes Mallows Score: %0.3f" % metrics.fowlkes_mallows_score(labels_true, labels))
plt.close('all')
plt.figure(1)
plt.clf()
colors = cycle('bgrcmykbgrcmykbgrcmykbgrcmyk')
for k, col in zip(range(n_clusters_), colors):
class_members = labels == k
cluster_center = X[cluster_centers_indices[k]]
plt.plot(X[class_members, 0], X[class_members, 1], col + '.')
plt.plot(cluster_center[0], cluster_center[1], 'o', markerfacecolor=col, markeredgecolor='k', markersize=14)
for x in X[class_members]:
plt.plot([cluster_center[0], x[0]], [cluster_center[1], x[1]], col)
plt.title('Estimated number of clusters: %d' % n_clusters_)
plt.show()
def test_Kmeans_n_init(*data):
'''
test the performance with different n_init and init paramter
:param data: data, target
:return: None
'''
X,labels_true=data
nums=range(1,50)
## graph
fig=plt.figure()
ARIs_k=[]
Distances_k=[]
ARIs_r=[]
Distances_r=[]
for num in nums:
clst=cluster.KMeans(n_init=num,init='k-means++')
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs_k.append(adjusted_rand_score(labels_true,predicted_labels))
Distances_k.append(clst.inertia_)
clst=cluster.KMeans(n_init=num,init='random')
clst.fit(X)
predicted_labels=clst.predict(X)
ARIs_r.append(adjusted_rand_score(labels_true,predicted_labels))
Distances_r.append(clst.inertia_)
ax=fig.add_subplot(1,2,1)
ax.plot(nums,ARIs_k,marker="+",label="k-means++")
ax.plot(nums,ARIs_r,marker="+",label="random")
ax.set_xlabel("n_init")
ax.set_ylabel("ARI")
ax.set_ylim(0,1)
ax.legend(loc='best')
ax=fig.add_subplot(1,2,2)
ax.plot(nums,Distances_k,marker='o',label="k-means++")
ax.plot(nums,Distances_r,marker='o',label="random")
ax.set_xlabel("n_init")
ax.set_ylabel("inertia_")
ax.legend(loc='best')
fig.suptitle("KMeans")
plt.show()
def test_affinities():
# Note: in the following, random_state has been selected to have
# a dataset that yields a stable eigen decomposition both when built
# on OSX and Linux
X, y = make_blobs(n_samples=20, random_state=0,
centers=[[1, 1], [-1, -1]], cluster_std=0.01
)
# nearest neighbors affinity
sp = SpectralClustering(n_clusters=2, affinity='nearest_neighbors',
random_state=0)
assert_warns_message(UserWarning, 'not fully connected', sp.fit, X)
assert_equal(adjusted_rand_score(y, sp.labels_), 1)
sp = SpectralClustering(n_clusters=2, gamma=2, random_state=0)
labels = sp.fit(X).labels_
assert_equal(adjusted_rand_score(y, labels), 1)
X = check_random_state(10).rand(10, 5) * 10
kernels_available = kernel_metrics()
for kern in kernels_available:
# Additive chi^2 gives a negative similarity matrix which
# doesn't make sense for spectral clustering
if kern != 'additive_chi2':
sp = SpectralClustering(n_clusters=2, affinity=kern,
random_state=0)
labels = sp.fit(X).labels_
assert_equal((X.shape[0],), labels.shape)
sp = SpectralClustering(n_clusters=2, affinity=lambda x, y: 1,
random_state=0)
labels = sp.fit(X).labels_
assert_equal((X.shape[0],), labels.shape)
def histogram(x, y, **kwargs):
# Histogram kernel implemented as a callable.
assert_equal(kwargs, {}) # no kernel_params that we didn't ask for
return np.minimum(x, y).sum()
sp = SpectralClustering(n_clusters=2, affinity=histogram, random_state=0)
labels = sp.fit(X).labels_
assert_equal((X.shape[0],), labels.shape)
# raise error on unknown affinity
sp = SpectralClustering(n_clusters=2, affinity='<unknown>')
assert_raises(ValueError, sp.fit, X)
def compare_with_children(
self, idea_id, post_ids, post_clusters, remainder, labels):
# Compare to children classification
compare_with_ideas = None
all_idea_scores = []
ideas_of_post = defaultdict(list)
children_remainder = set(post_ids)
children_ids = self.idea_children[idea_id]
if len(children_ids):
posts_of_children = {
child_id: self.get_posts_of_idea(child_id)
for child_id in children_ids}
for idea_id, c_post_ids in posts_of_children.items():
for post_id in c_post_ids:
ideas_of_post[post_id].append(idea_id)
children_remainder -= set(c_post_ids)
for post_id in children_remainder:
ideas_of_post[post_id] = [idea_id]
# if many ideas to a post, choose one with the most ideas in same cluster.
# A bit arbitrary but I need a single idea.
for cluster in chain(post_clusters, (remainder,)):
idea_score = defaultdict(int)
all_idea_scores.append(idea_score)
for post_id in cluster:
for idea_id in ideas_of_post[post_id]:
idea_score[idea_id] += 1
for post_id in cluster:
if len(ideas_of_post[post_id]) > 1:
scores = [(idea_score[idea_id], idea_id)
for idea_id in ideas_of_post[post_id]]
scores.sort(reverse=True)
ideas_of_post[post_id] = [score[1] for score in scores]
# index_by_post_id = {v: k for (k, v) in post_id_by_index.iteritems()}
idea_of_index = [ideas_of_post[post_id][0] for post_id in post_ids]
compare_with_ideas = {
"Homogeneity": metrics.homogeneity_score(idea_of_index, labels),
"Completeness": metrics.completeness_score(idea_of_index, labels),
"V-measure": metrics.v_measure_score(idea_of_index, labels),
"Adjusted Rand Index": metrics.adjusted_rand_score(
idea_of_index, labels),
"Adjusted Mutual Information": metrics.adjusted_mutual_info_score(
idea_of_index, labels)}
else:
for post_id in children_remainder:
ideas_of_post[post_id] = [idea_id]
for cluster in chain(post_clusters, (remainder,)):
all_idea_scores.append({idea_id: len(cluster)})
return (compare_with_ideas, all_idea_scores, ideas_of_post,
children_remainder)