def validate(data, labels):
'''
Ten-fold cross-validation with stratified sampling.
'''
accuracy_scores = []
precision_scores = []
recall_scores = []
f1_scores = []
sss = StratifiedShuffleSplit(n_splits=10)
for train_index, test_index in sss.split(data, labels):
x_train, x_test = data[train_index], data[test_index]
y_train, y_test = labels[train_index], labels[test_index]
clf.fit(x_train, y_train)
y_pred = clf.predict(x_test)
accuracy_scores.append(accuracy_score(y_test, y_pred))
precision_scores.append(precision_score(y_test, y_pred))
recall_scores.append(recall_score(y_test, y_pred))
f1_scores.append(f1_score(y_test, y_pred))
print('Accuracy', np.mean(accuracy_scores))
print('Precision', np.mean(precision_scores))
print('Recall', np.mean(recall_scores))
print('F1-measure', np.mean(f1_scores))
python类f1_score()的实例源码
def train_model_with_cv(model, params, X, y):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20)
# Use Train data to parameter selection in a Grid Search
gs_clf = GridSearchCV(model, params, n_jobs=1, cv=5)
gs_clf = gs_clf.fit(X_train, y_train)
model = gs_clf.best_estimator_
# Use best model and test data for final evaluation
y_pred = model.predict(X_test)
_f1 = f1_score(y_test, y_pred, average='micro')
_confusion = confusion_matrix(y_test, y_pred)
__precision = precision_score(y_test, y_pred)
_recall = recall_score(y_test, y_pred)
_statistics = {'f1_score': _f1,
'confusion_matrix': _confusion,
'precision': __precision,
'recall': _recall
}
return model, _statistics
def test_data_ann_rnn(feats, target, groups, ann, rnn):
"""
mode = 'scores' or 'preds'
take two ready trained models (cnn+rnn)
test on input data and return acc+f1
"""
if target.ndim==2: target = np.argmax(target,1)
cnn_pred = ann.predict_classes(feats, 1024, verbose=0)
cnn_acc = accuracy_score(target, cnn_pred)
cnn_f1 = f1_score(target, cnn_pred, average='macro')
seqlen = rnn.input_shape[1]
features_seq, target_seq, groups_seq = tools.to_sequences(feats, target, seqlen=seqlen, groups=groups)
new_targ_seq = np.roll(target_seq, 4)
rnn_pred = rnn.predict_classes(features_seq, 1024, verbose=0)
rnn_acc = accuracy_score(new_targ_seq, rnn_pred)
rnn_f1 = f1_score(new_targ_seq,rnn_pred, average='macro')
confmat = confusion_matrix(new_targ_seq, rnn_pred)
return [cnn_acc, cnn_f1, rnn_acc, rnn_f1, confmat, (rnn_pred, target_seq, groups_seq)]
def multilabel_classifier(X_train, Y_train, X_val, Y_val, X_test, Y_test, nb_epoch=200, batch_size=10, seed=7):
clf = sigmoid_network(X_train.shape[1], Y_train.shape[1])
clf.fit(X_train, Y_train,
nb_epoch=nb_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(X_val, Y_val),
callbacks=[
ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.01),
EarlyStopping(monitor='val_loss', min_delta=1e-5, patience=5, verbose=0, mode='auto'),
]
)
pred = clf.predict(X_test)
pred = (pred > .5) * 1
macro_f1 = f1_score(Y_test, pred, average='macro')
micro_f1 = f1_score(Y_test, pred, average='micro')
return [macro_f1, micro_f1]
def f1_same_duplicates_score(x, y):
"""
Given cluster labels x and y, compute the f1 score
that the same elements are marked as duplicates
"""
import warnings
from sklearn.metrics import f1_score
if x.shape != y.shape:
raise ValueError
x_dup = _dbscan_unique2noisy(x)
x_dup[x_dup > -1] = 1 # duplicates
x_dup[x_dup == -1] = 0 # not duplicates
y_dup = _dbscan_unique2noisy(y)
y_dup[y_dup > -1] = 1 # duplicates
y_dup[y_dup == -1] = 0 # not duplicates
x_dup = np.abs(x_dup)
y_dup = np.abs(y_dup)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
score = f1_score(x_dup, y_dup)
return score
def evaluate(path):
true = [int(pair[1] is None or gold[pair]) for pair in resources[path]]
pred = [int(pair[1] is not None) for pair in resources[path]]
tn, fp, fn, tp = confusion_matrix(true, pred).ravel()
return {
'tn': tn,
'fp': fp,
'fn': fn,
'tp': tp,
'precision': precision_score(true, pred),
'recall': recall_score(true, pred),
'f1': f1_score(true, pred),
'scores': scores(resources[path])
}
def evaluate(path):
G = resources[path]
pred = [int(has_sense_path(G, *pair)) for pair in union]
tn, fp, fn, tp = confusion_matrix(true, pred).ravel()
return {
'tn': tn,
'fp': fp,
'fn': fn,
'tp': tp,
'precision': precision_score(true, pred),
'recall': recall_score(true, pred),
'f1': f1_score(true, pred),
'scores': scores(G)
}
def analyzeResult_temp(data,model,DataVecs):
predict = model.predict(DataVecs)
data['predict'] = predict
print ("Accuracy: %f %%" % (100. * sum(data["label"] == data["predict"]) / len(data["label"])))
answer1 = data[data["label"] == 1]
answer2 = data[data["label"] == 0]
print ("Positive Accuracy: %f %%" % (100. * sum(answer1["label"] == answer1["predict"]) / len(answer1["label"])))
print ("Negative Accuracy: %f %%" % (100. * sum(answer2["label"] == answer2["predict"]) / len(answer2["label"])))
try:
result_auc = model.predict_proba(DataVecs)
print ("Roc:%f\nAUPR:%f\n" % (roc_auc_score(data["label"],result_auc[:,1]),
average_precision_score(data["label"],result_auc[:,1])))
print("Precision:%f\nRecall:%f\nF1score:%f\nMCC:%f\n" %(precision_score(data["label"],data["predict"]),
recall_score(data["label"],data["predict"]),
f1_score(data["label"],data["predict"]),
matthews_corrcoef(data["label"],data["predict"])))
except:
print "ROC unavailable"
# Performance evaluation and result analysis uing adjusted thresholds
def analyzeResult(data,model,DataVecs,threshold):
predict = model.predict_proba(DataVecs)[:,1]
True,False=1,0
data['predict'] = (predict > threshold)
print ("Accuracy: %f %%" % (100. * sum(data["label"] == data["predict"]) / len(data["label"])))
answer1 = data[data["label"] == 1]
answer2 = data[data["label"] == 0]
print ("Positive Accuracy: %f %%" % (100. * sum(answer1["label"] == answer1["predict"]) / len(answer1["label"])))
print ("Negative Accuracy: %f %%" % (100. * sum(answer2["label"] == answer2["predict"]) / len(answer2["label"])))
try:
result_auc = model.predict_proba(DataVecs)
print ("Roc:%f\nAUPR:%f\n" % (roc_auc_score(data["label"],result_auc[:,1]),
average_precision_score(data["label"],result_auc[:,1])))
print("Precision:%f\nRecall:%f\nF1score:%f\nMCC:%f\n" %(precision_score(data["label"],data["predict"]),
recall_score(data["label"],data["predict"]),
f1_score(data["label"],data["predict"]),
matthews_corrcoef(data["label"],data["predict"])))
except:
print "ROC unavailable"
# Performance evaluation
def run_regression(train_embeds, train_labels, test_embeds, test_labels):
np.random.seed(1)
from sklearn.linear_model import SGDClassifier
from sklearn.dummy import DummyClassifier
from sklearn.metrics import f1_score
from sklearn.multioutput import MultiOutputClassifier
dummy = MultiOutputClassifier(DummyClassifier())
dummy.fit(train_embeds, train_labels)
log = MultiOutputClassifier(SGDClassifier(loss="log"), n_jobs=10)
log.fit(train_embeds, train_labels)
f1 = 0
for i in range(test_labels.shape[1]):
print("F1 score", f1_score(test_labels[:,i], log.predict(test_embeds)[:,i], average="micro"))
for i in range(test_labels.shape[1]):
print("Random baseline F1 score", f1_score(test_labels[:,i], dummy.predict(test_embeds)[:,i], average="micro"))
04_sent.py 文件源码
项目:Building-Machine-Learning-Systems-With-Python-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def __grid_search_model(clf_factory, X, Y):
cv = ShuffleSplit(
n=len(X), n_iter=10, test_size=0.3, indices=True, random_state=0)
param_grid = dict(vect__ngram_range=[(1, 1), (1, 2), (1, 3)],
vect__min_df=[1, 2],
vect__smooth_idf=[False, True],
vect__use_idf=[False, True],
vect__sublinear_tf=[False, True],
vect__binary=[False, True],
clf__alpha=[0, 0.01, 0.05, 0.1, 0.5, 1],
)
grid_search = GridSearchCV(clf_factory(),
param_grid=param_grid,
cv=cv,
score_func=f1_score,
verbose=10)
grid_search.fit(X, Y)
clf = grid_search.best_estimator_
print clf
return clf
02_tuning.py 文件源码
项目:Building-Machine-Learning-Systems-With-Python-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def grid_search_model(clf_factory, X, Y):
cv = ShuffleSplit(
n=len(X), n_iter=10, test_size=0.3, indices=True, random_state=0)
param_grid = dict(vect__ngram_range=[(1, 1), (1, 2), (1, 3)],
vect__min_df=[1, 2],
vect__stop_words=[None, "english"],
vect__smooth_idf=[False, True],
vect__use_idf=[False, True],
vect__sublinear_tf=[False, True],
vect__binary=[False, True],
clf__alpha=[0, 0.01, 0.05, 0.1, 0.5, 1],
)
grid_search = GridSearchCV(clf_factory(),
param_grid=param_grid,
cv=cv,
score_func=f1_score,
verbose=10)
grid_search.fit(X, Y)
clf = grid_search.best_estimator_
print clf
return clf
def bidirectional_gru(len_output):
# sequence_input is a matrix of glove vectors (one for each input word)
sequence_input = Input(
shape=(MAX_SEQUENCE_LENGTH, EMBEDDING_DIM,), dtype='float32')
l_lstm = Bidirectional(GRU(100))(sequence_input)
# TODO look call(input_at_t, states_at_t) method, returning (output_at_t, states_at_t_plus_1)
# also look at switch(condition, then_expression, else_expression) for deciding when to feed previous state
preds = Dense(len_output, activation='softmax')(l_lstm)
model = Model(sequence_input, preds)
model.compile(loss='categorical_crossentropy',
optimizer='rmsprop',
metrics=[utils.f1_score, 'categorical_accuracy'])
return model
# required, see values below
def score_icon_plain(ref_file, hyp_file, n_significance_tests=20):
ref_tags = read_tag_file(ref_file)
hyp_tags = read_tag_file(hyp_file)
assert len(ref_tags) == len(hyp_tags), 'ref file and hyp file must have the same number of tags'
for ref_line, hyp_line in zip(ref_tags, hyp_tags):
assert len(ref_line) == len(hyp_line), 'ref line and hyp line must have the same number of tags'
# flatten out tags
flat_ref_tags = [t for l in ref_tags for t in l]
flat_hyp_tags = [t for l in hyp_tags for t in l]
actual_class_f1 = f1_score(flat_ref_tags, flat_hyp_tags, average=None)
actual_average_f1 = weighted_fmeasure(flat_ref_tags, flat_hyp_tags)
# END EVALUATION
return [actual_class_f1, actual_average_f1]
def predict_labels(self, features, target):
print("Predicting labels using {}...".format(self.classifier.__name__))
# start = np.datetime64(datetime.datetime.now(),"us")
start = time.time()
y_pred = self.model.predict(features)
# end = np.datetime64(datetime.datetime.now(),"us")
end = time.time()
prediction_time = end - start
f1_score_output = f1_score(target, y_pred, average="binary")
print("Predicting labels using {} with optimal parameters...".format(self.classifier.__name__))
start = time.time()
y_pred = self.optimal_model.predict(features)
end = time.time()
optimal_prediction_time = end - start
f1_optimal_score_output = f1_score(target, y_pred, average="binary")
return f1_score_output, prediction_time, \
f1_optimal_score_output, optimal_prediction_time
def KMeansAccuracy():
clusterer = KMeans(n_clusters=2, n_init=30)
tdm = pickle.load(open(DATASET_PATH + "BOW.p", "rb"))
predictions = clusterer.fit_predict(tdm)
true_labels = pickle.load(open(OUTFILE_STANCE, "rb"))[0]
numerical_mapped_1 = [0 if i == "Israeli" else 1 for i in true_labels]
numerical_mapped_2 = [1 if i == "Israeli" else 0 for i in true_labels]
one = f1_score(numerical_mapped_1, predictions)
two = f1_score(numerical_mapped_2, predictions)
print("The F1 score of KMeans on BOW is: " + str(max(one, two)))
clusterer = KMeans(n_clusters=2, n_init=30)
predictions = clusterer.fit_predict(tdm)
true_labels = pickle.load(open(OUTFILE_STANCE, "rb"))[0]
accuracy = predict_accuracy(true_labels, predictions)
print("The F1 score of KMeans on BOW (w/Tdidf) is: " + accuracy)
def display_evaluation_metrics(true_labels, predicted_labels, positive_class=1):
print 'Accuracy:', np.round(
metrics.accuracy_score(true_labels,
predicted_labels),
2)
print 'Precision:', np.round(
metrics.precision_score(true_labels,
predicted_labels,
pos_label=positive_class,
average='binary'),
2)
print 'Recall:', np.round(
metrics.recall_score(true_labels,
predicted_labels,
pos_label=positive_class,
average='binary'),
2)
print 'F1 Score:', np.round(
metrics.f1_score(true_labels,
predicted_labels,
pos_label=positive_class,
average='binary'),
2)
def get_metrics(true_labels, predicted_labels):
print 'Accuracy:', np.round(
metrics.accuracy_score(true_labels,
predicted_labels),
2)
print 'Precision:', np.round(
metrics.precision_score(true_labels,
predicted_labels,
average='weighted'),
2)
print 'Recall:', np.round(
metrics.recall_score(true_labels,
predicted_labels,
average='weighted'),
2)
print 'F1 Score:', np.round(
metrics.f1_score(true_labels,
predicted_labels,
average='weighted'),
2)
new_data_mlp.py 文件源码
项目:NVDM-For-Document-Classification
作者: cryanzpj
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def thres_search(data,label,n):
res = []
for i in range(n):
n_label = tf.cast(tf.reduce_sum(label[i]),tf.int32)
temp = tf.mul(data[i],label[i])
temp = tf.reshape(tf.nn.top_k(temp,n_label +1).values,[1,1,-1,1])
thres = tf.reshape(tf.contrib.layers.avg_pool2d(temp,[1,2],[1,1]),[-1,1])
predicts = tf.map_fn(lambda x: tf.cast(tf.greater_equal(data[i],x),tf.float32),thres)
f1_scores = tf.map_fn(lambda x: f1(x,label[i]),predicts)
thres_opt = thres[tf.cast(tf.arg_max(f1_scores,0),tf.int32)]
res.append(thres_opt)
# R = tf.map_fn(lambda x: tf.contrib.metrics.streaming_recall(x,label[i])[0],predicts)
# P = tf.map_fn(lambda x: tf.contrib.metrics.streaming_precision(x,label[i])[0],predicts)
#thres_opt = thres[np.argsort(map(lambda x: metrics.f1_score(x,sess.run(label[i]),average = "macro") ,predicts))[-1]]
return tf.reshape(res,[-1])
new_data_mlp.py 文件源码
项目:NVDM-For-Document-Classification
作者: cryanzpj
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def SVMbanchmark(X_train, y_train, X_test, y_test):
# optimial c is 10.0, f1 = 0.52
print("Training LinearSVC with l1-based feature selection")
X_valid, y_valid = X_test[:10000], y_test[:10000]
score_list = []
CList = [0.1, 0.5, 1, 10, 50, 100]
for c in CList:
clf = OneVsRestClassifier(LinearSVC(C=c, penalty='l1', dual=False))
clf.fit(X_train, y_train)
pred = clf.predict(X_valid)
score = metrics.f1_score(y_valid, pred, average="macro")
score_list.append(score)
print("f1-score: {:f}, c is {:f}".format(score, c))
clf = OneVsRestClassifier(LinearSVC(penality="l1", dual=False, C=CList[np.argmax(score_list)]))
clf.fit(X_train, y_train)
pred = clf.predict(X_test)
score = metrics.f1_score(y_test, pred, average="micro")
print("f1-score for test set: {:f}".format(score))
def cv(feature_dict, feature, polarity, folds):
kfold = KFold(len(polarity), n_folds = folds)
count, f1, recall, precision, accuracy = 0, 0, 0, 0, 0
for train, test in kfold:
LR = LogisticRegression()
count += 1
x = [(feature[i]) for i in train]
y = [(polarity[i])for i in train]
LR.fit(scipy.sparse.vstack(x), (y))
test_label = []
answer_label = [(polarity[j]) for j in test]
for j in test:
query = feature[j]
result = -1 if query.shape[1] != len(feature_dict) else predict(LR, query)
test_label.append(int(result[0]))
accuracy += accuracy_score(answer_label, test_label)
precision += precision_score(answer_label, test_label)
recall += recall_score(answer_label, test_label)
f1 += f1_score(answer_label, test_label)
print('{}_fold finished.'.format(count))
return accuracy, precision, recall, f1
def __init__(self, corpus, relationtype, modelname="scikit_classifier"):
super(ScikitRE, self).__init__()
self.modelname = relationtype + "_" + modelname
self.relationtype = relationtype
self.pairtype = relationtype
self.corpus = corpus
self.pairs = []
self.features = []
self.labels = []
self.pred = []
self.clusters = word2vec.load_clusters("corpora/Thaliana/documents-processed-clusters.txt")
self.posfmeasure = make_scorer(f1_score, average='binary', pos_label=True)
self.generate_data(corpus, modelname, relationtype)
self.text_clf = Pipeline([('vect', CountVectorizer(analyzer='char_wb', ngram_range=(3,20), min_df=0.0, max_df=0.7)),
#('vect', CountVectorizer(ngram_range=(1,3), binary=False, max_features=None)),
#('tfidf', TfidfTransformer(use_idf=True, norm="l2")),
#('clf', SGDClassifier(loss='hinge', penalty='l1', alpha=0.0001, n_iter=5, random_state=42)),
#('clf', SGDClassifier())
#('clf', svm.NuSVC(nu=0.01 ))
#('clf', RandomForestClassifier(class_weight={False:1, True:2}, n_jobs=-1))
('clf', MultinomialNB(alpha=0.01, fit_prior=False))
#('clf', DummyClassifier(strategy="constant", constant=True))
])
def printResult(y_true, y_pred):
acc = accuracy_score(y_true, y_pred)
print("Accuracy: {:.4%}".format(acc))
precision = metrics.precision_score(y_true, y_pred)
recall = metrics.recall_score(y_true, y_pred)
f1_score = metrics.f1_score(y_true, y_pred)
confusion_matrix = metrics.confusion_matrix(y_true, y_pred)
print "Precision:", precision
print "Recall:", recall
print "f1_score:", f1_score
print "confusion_matrix:"
print confusion_matrix
resultStr = "Precision: " + str(precision) +"\n" + \
"Recall: " + str(recall) + "\n" + \
"f1_score: " + str(f1_score) +"\n" + \
"confusion_matrix" + "\n" +\
str(confusion_matrix) + "\n"
return resultStr
def build_grid_search(X, y):
parameters = {
"estimator__criterion": ['gini', 'entropy'],
"estimator__max_depth": [10, 15, 20, 25, None],
"estimator__max_features": ['auto', 'sqrt', 'log2', None]
}
ovr = OneVsRestClassifier(RandomForestClassifier(n_estimators=1000,
oob_score=True, n_jobs=-1, verbose=1))
model_tunning = GridSearchCV(ovr, param_grid=parameters, verbose=1,
n_jobs=-1, cv=10,
scoring=make_scorer(f1_score))
model_tunning.fit(X, y)
test_score = model_tunning.best_score_
print 'The best test score: ', test_score
y_score = model_tunning.predict_proba(X_test)
multiclass_roc(y_score, 'grid_search_02')
return model_tunning
def backtestHistory(_initial_virtual_shares, _start_date, _stockcode, _interval,_train_batch_size = 100):
ZZZZ = Investor(_name='ZZZZ', _initial_virtual_shares=_initial_virtual_shares, _start_date=_start_date, _stockcode=_stockcode, _interval=_interval,_train_batch_size = _train_batch_size)
total = ZZZZ.maxcnt-ZZZZ.now
# pbar = ProgressBar(widgets=[' ', AnimatedMarker(), 'Predicting: ', Percentage()], maxval=total).start()
while ZZZZ.now < ZZZZ.maxcnt:
# pbar.update(ZZZZ.now)
# time.sleep(0.01)
ZZZZ.TradeNext(use_NN=False)
# pbar.finish()
print
print classification_report(ZZZZ.TRUEY, ZZZZ.PREDY)
f1 = f1_score(ZZZZ.TRUEY, ZZZZ.PREDY)
accuracy = accuracy_score(ZZZZ.TRUEY, ZZZZ.PREDY)
print "accuracy:", accuracy
print "f1: ",f1
predROR = ZZZZ.getTotalROR()[0]
realROR = ZZZZ.getTotalROR()[1]
assert not (realROR == 0)
print 'pred ROR:', predROR, '%', '\t|\treal ROR:', realROR, '%'
return predROR, realROR, f1, accuracy, total, ZZZZ.TRAINERROR
def compute_score(self, conf, hy):
RS = recall_score(self.y, hy, average=None)
conf['_all_f1'] = M = {str(self.le.inverse_transform([klass])[0]): f1 for klass, f1 in enumerate(f1_score(self.y, hy, average=None))}
conf['_all_recall'] = {str(self.le.inverse_transform([klass])[0]): f1 for klass, f1 in enumerate(RS)}
conf['_all_precision'] = N = {str(self.le.inverse_transform([klass])[0]): f1 for klass, f1 in enumerate(precision_score(self.y, hy, average=None))}
conf['_macrorecall'] = np.mean(RS)
if len(self.le.classes_) == 2:
conf['_macrof1'] = np.mean(np.array([v for v in conf['_all_f1'].values()]))
conf['_weightedf1'] = conf['_microf1'] = f1_score(self.y, hy, average='binary')
else:
conf['_macrof1'] = f1_score(self.y, hy, average='macro')
conf['_microf1'] = f1_score(self.y, hy, average='micro')
conf['_weightedf1'] = f1_score(self.y, hy, average='weighted')
conf['_accuracy'] = accuracy_score(self.y, hy)
if self.score.startswith('avgf1:'):
_, k1, k2 = self.score.split(':')
conf['_' + self.score] = (M[k1] + M[k2]) / 2
elif self.score.startswith('avgf1f0:'):
_, k1, k2 = self.score.split(':')
pos = (M[k1] + N[k1]) / 2.
neg = (M[k2] + N[k2]) / 2.
conf['_' + self.score] = (pos + neg) / 2.
conf['_score'] = conf['_' + self.score]
def test_wrapper_score():
from b4msa.params import Wrapper
from sklearn.metrics import f1_score, precision_score
import numpy as np
np.random.seed(0)
y = np.random.randint(3, size=100).astype(np.str)
hy = np.random.randint(3, size=100)
w = Wrapper(None, y, 'avgf1:0:2', 10, None)
conf = {}
w.compute_score(conf, hy)
f1 = f1_score(y.astype(np.int), hy, average=None)
assert conf['_accuracy'] == (y.astype(np.int) == hy).mean()
print(conf['_avgf1:0:2'], (f1[0] + f1[2]) / 2.)
assert conf['_avgf1:0:2'] == (f1[0] + f1[2]) / 2.
precision = precision_score(y.astype(np.int), hy, average=None)
pos = (f1[0] + precision[0]) / 2.
neg = (f1[2] + precision[2]) / 2.
w = Wrapper(None, y, 'avgf1f0:0:2', 10, None)
w.compute_score(conf, hy)
assert conf['_avgf1f0:0:2'] == (pos + neg) / 2.
def evaluateNodeClassification(X, Y, test_ratio):
X_train, X_test, Y_train, Y_test = sk_ms.train_test_split(
X,
Y,
test_size=test_ratio
)
try:
top_k_list = list(Y_test.toarray().sum(axis=1))
except:
top_k_list = list(Y_test.sum(axis=1))
classif2 = TopKRanker(lr())
classif2.fit(X_train, Y_train)
prediction = classif2.predict(X_test, top_k_list)
micro = f1_score(Y_test, prediction, average='micro')
macro = f1_score(Y_test, prediction, average='macro')
return (micro, macro)
def benchmark(clf):
print('_' * 80)
print("Training: ")
print(clf)
t0 = time()
clf.fit(X_train, y_train)
train_time = time() - t0
print("train time: %0.3fs" % train_time)
t0 = time()
pred = clf.predict(X_test)
test_time = time() - t0
# print(clf)
print("test time: %0.3fs" % test_time)
# score = metrics.f1_score(y_test, pred)
# print("f1-score: %0.3f" % score)
l =[]
print("Predicted classes:-")
for element in range(9):
print(listdir("/home/shrinidhi/WTProject/twitter/testing/"+str(y_test[element])),": ",categories[pred[element]])
for i in listdir(join("/home/shrinidhi/WTProject/twitter/testing/",str(y_test[element]))):
l.append((i.split(".")[0],categories[pred[element]]))
clf_descr = str(clf).split('(')[0]
return l
def evaluate_precision_recall(y, target, labels):
import sklearn.metrics as metrics
target = target[:len(y)]
num_classes = max(target) + 1
results = []
for i in range(num_classes):
class_target = _extract_single_class(i, target)
class_y = _extract_single_class(i, y)
results.append({
'precision': metrics.precision_score(class_target, class_y),
'recall': metrics.recall_score(class_target, class_y),
'f1': metrics.f1_score(class_target, class_y),
'fraction': sum(class_target)/len(target),
'#of_class': int(sum(class_target)),
'label': labels[i],
'label_id': i
# 'tp': tp
})
print('%d/%d' % (i, num_classes), results[-1])
accuracy = metrics.accuracy_score(target, y)
return accuracy, results