def __call__(self, X, y, net):
if self.eval_size is not None:
if net.regression or not self.stratify:
# test_size = self.eval_size
# kf = ShuffleSplit(
# y.shape[0], test_size=test_size,
# random_state=self.random_state
# )
# train_indices, valid_indices = next(iter(kf))
# valid_indices = shuffle(valid_indices)
test_size = 1 - self.eval_size
kf = ShuffleSplit(
y.shape[0], test_size=test_size,
random_state=self.random_state
)
valid_indices, train_indices = next(iter(kf))
else:
n_folds = int(round(1 / self.eval_size))
kf = StratifiedKFold(y, n_folds=n_folds, random_state=self.random_state)
train_indices, valid_indices = next(iter(kf))
X_train, y_train = X[train_indices], y[train_indices]
X_valid, y_valid = X[valid_indices], y[valid_indices]
else:
X_train, y_train = X, y
X_valid, y_valid = X[len(X):], y[len(y):]
return X_train, X_valid, y_train, y_valid
python类StratifiedKFold()的实例源码
def get_cv_generator(training_data, do_segment_split=True, random_state=None):
"""
Returns a cross validation generator.
:param training_data: The training data to create the folds from.
:param do_segment_split: If True, the folds will be generated based on the segment names.
:param random_state: A constant to use as a random seed.
:return: A generator which can be used by the grid search to generate cross validation folds.
"""
k_fold_kwargs = dict(n_folds=10, random_state=random_state)
if do_segment_split:
cv = dataset.SegmentCrossValidator(training_data, cross_validation.StratifiedKFold, **k_fold_kwargs)
else:
cv = sklearn.cross_validation.StratifiedKFold(training_data['Preictal'], **k_fold_kwargs)
return cv
def __init__(self, dataframe, base_cv=None, **cv_kwargs):
# We create a copy of the dataframe with a new last level
# index which is an enumeration of the rows (like proper indices)
self.all_segments = pd.DataFrame({'Preictal': dataframe['Preictal'], 'i': np.arange(len(dataframe))})
self.all_segments.set_index('i', append=True, inplace=True)
# Now create a series with only the segments as rows. This is what we will pass into the wrapped cross
# validation generator
self.segments = self.all_segments['Preictal'].groupby(level='segment').first()
self.segments.sort(inplace=True)
if base_cv is None:
self.cv = cross_validation.StratifiedKFold(self.segments, **cv_kwargs)
else:
self.cv = base_cv(self.segments, **cv_kwargs)
def split_dataset(dataframe, training_ratio=.8, do_segment_split=True, shuffle=False, random_state=None):
"""
Splits the dataset into a training and test partition.
:param dataframe: A data frame to split. Should have a 'Preictal' column.
:param training_ratio: The ratio of the data to use for the first part.
:param do_segment_split: If True, the split will be done on whole segments.
:param shuffle: If true, the split will shuffle the data before splitting.
:param random_state: Seed
:return: A pair of disjoint data frames, where the first frame contains *training_ratio* of all the data.
"""
# We'll make the splits based on the sklearn cross validators,
# We calculate the number of folds which correspond to the
# desired training ratio. If *r* is the training ratio and *k*
# the nubmer of folds, we'd like *r* = (*k* - 1)/*k*, that is,
# the ratio should be the same as all the included folds divided
# by the total number of folds. This gives us *k* = 1/(1-*r*)
k = int(np.floor(1/(1 - training_ratio)))
if do_segment_split:
# We use the segment based cross validator to get a stratified split.
cv = SegmentCrossValidator(dataframe,
n_folds=k,
shuffle=shuffle,
random_state=random_state)
else:
# Don't split by segment, but still do a stratified split
cv = cross_validation.StratifiedKFold(dataframe['Preictal'],
n_folds=k,
shuffle=shuffle,
random_state=random_state)
training_indices, test_indices = first(cv)
return dataframe.iloc[training_indices], dataframe.iloc[test_indices]
def cv_score(classifier, dataset, metric=accuracy_score, n_folds=10):
"""
Calculate K-fold cross validation score.
"""
true_labels = []
predicted_labels = []
for train_idx, test_idx in StratifiedKFold(list(dataset.get_labels()), n_folds=n_folds):
# clear the classifier (call `clear` RPC).
classifier.clear()
# split the dataset to train/test dataset.
(train_ds, test_ds) = (dataset[train_idx], dataset[test_idx])
# train the classifier using train dataset.
for (idx, label) in classifier.train(train_ds):
pass
# test the classifier using test dataset.
for (idx, label, result) in classifier.classify(test_ds):
# labels are already desc sorted by score values, so you can get a label
# name with the hightest prediction score by:
pred_label = result[0][0]
# store the result.
true_labels.append(label)
predicted_labels.append(pred_label)
# return cross-validation score
return metric(true_labels, predicted_labels)
def _make_kfold(self, Y):
if self.MyKfold is not None:
return self.MyKfold
else:
return StratifiedKFold(Y, self.n_folds)
def validation(self, X, Y, wv_X, kind):
"""
2-fold validation
:param X: train text
:param Y: train label
:param wv_X: train wv_vec
:param kind: age/gender/education
:return: mean score of 2-fold validation
"""
print '????...'
X=np.array(X)
fold_n=2
folds = list(StratifiedKFold(Y, n_folds=fold_n, shuffle=False,random_state=0))
score = np.zeros(fold_n)
for j, (train_idx, test_idx) in enumerate(folds):
print j+1,'-fold'
X_train = X[train_idx]
y_train = Y[train_idx]
X_test = X[test_idx]
y_test = Y[test_idx]
wv_X_train =wv_X[train_idx]
wv_X_test = wv_X[test_idx]
vec = TfidfVectorizer(use_idf=True,sublinear_tf=False, max_features=50000, binary=True)
vec.fit(X_train, y_train)
X_train = vec.transform(X_train)
X_test = vec.transform(X_test)
print 'shape',X_train.shape
ypre = self.stacking(X_train,y_train,X_test,wv_X_train,wv_X_test,kind)
cur = sum(y_test == ypre) * 1.0 / len(ypre)
score[j] = cur
print score
print score.mean(),kind
return score.mean()
def validation(self, X, Y, wv_X, kind):
"""
2-fold validation
:param X: train text
:param Y: train label
:param wv_X: train wv_vec
:param kind: age/gender/education
:return: mean score of 2-fold validation
"""
print '????...'
X=np.array(X)
fold_n=2
folds = list(StratifiedKFold(Y, n_folds=fold_n, shuffle=False,random_state=0))
score = np.zeros(fold_n)
for j, (train_idx, test_idx) in enumerate(folds):
print j+1,'-fold'
X_train = X[train_idx]
y_train = Y[train_idx]
X_test = X[test_idx]
y_test = Y[test_idx]
wv_X_train =wv_X[train_idx]
wv_X_test = wv_X[test_idx]
vec = TfidfVectorizer(use_idf=True,sublinear_tf=False, max_features=50000, binary=True)
vec.fit(X_train, y_train)
X_train = vec.transform(X_train)
X_test = vec.transform(X_test)
print 'shape',X_train.shape
ypre = self.stacking(X_train,y_train,X_test,wv_X_train,wv_X_test,kind)
cur = sum(y_test == ypre) * 1.0 / len(ypre)
score[j] = cur
print score
print score.mean(),kind
return score.mean()
def __call__(self, X, y):
"""
given a dataset X,y we split it, in order to do cross validation,
according to the procedure explained below:
if n_folds is not None, then we do cross validation
based on stratified folds
if n_class_samples is not None, then we do cross validation
using only <n_class_samples> training samples per class
if n_test_samples is not None, then we do cross validation
using only <n_test_samples> cross validaition samples per class
assumes that each datapoint is in a column of X
"""
n_classes = len(set(y))
if self.n_folds is not None:
# generate the folds
self.folds = StratifiedKFold(y, n_folds=self.n_folds,
shuffle=False, random_state=None)
elif self.n_class_samples is not None:
self.folds = []
for i in range(self.n_tests):
if type(self.n_class_samples) is not list:
self.n_class_samples = (np.ones(n_classes) * self.n_class_samples).astype(int)
if self.n_test_samples is not None:
self.n_test_samples = (np.ones(n_classes) * self.n_test_samples).astype(int)
data_idx = split_dataset(self.n_class_samples, self.n_test_samples, y)
train_idx = data_idx[0]
test_idx = data_idx[1]
self.folds.append((train_idx, test_idx))
self.cross_validate(X, y)
xgb_classification.py 文件源码
项目:jingjuSingingPhraseMatching
作者: ronggong
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def train_evaluate_stratified(clf, X, y, labels):
skf = StratifiedKFold(y, n_folds=10)
for fold_number, (train_index, test_index) in enumerate(skf):
X_train, y_train = X[train_index], y[train_index]
X_test, y_test = X[test_index], y[test_index]
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
save_results(y_test, y_pred, labels, fold_number)
def threshold_estimate_cv(x,y,k_fold):
print "%d %d %d" % (y.shape[0], sum(y==1), sum(y==0))
kf1 = StratifiedKFold(y, n_folds=k_fold, shuffle=True, random_state=0)
threshold = np.zeros((k_fold),dtype="float32")
cnt = 0
for train_index, test_index in kf1:
x_train, x_test = x[train_index], x[test_index]
y_train, y_test = y[train_index], y[test_index]
w1 = np.array([1]*y_train.shape[0])
weight = float(len(y_train[y_train == 0]))/float(len(y_train[y_train == 1]))
w1 = np.array([1]*y_train.shape[0])
w1[y_train==1]=weight
estimator = xgb.XGBClassifier(max_depth=10, learning_rate=0.1, n_estimators=1000, nthread=50)
estimator.fit(x_train, y_train, sample_weight=w1)
y_scores = estimator.predict_proba(x_test)[:,1]
precision, recall, thresholds = precision_recall_curve(y_test, y_scores)
f1 = 2*precision[2:]*recall[2:]/(precision[2:]+recall[2:])
m_idx = np.argmax(f1)
threshold[cnt] = thresholds[2+m_idx]
cnt += 1
print("%d %f %f" % (precision.shape[0], f1[m_idx], thresholds[2+m_idx]))
return np.mean(threshold), threshold
# Cross validation using gradient tree boosting
def print_metrics(clf):
#scores = cross_validation.cross_val_score(clf,features,labels,cv=5,scoring='accuracy')
#print 'Accuracy:',scores.mean()
cv = cross_validation.StratifiedKFold(labels,n_folds=5)
mean_tpr = 0.0
mean_fpr = np.linspace(0,1,100)
all_tpr = []
for i, (train,test) in enumerate(cv):
probas_ = clf.fit(features[train],labels[train]).predict_proba(features[test])
fpr,tpr,thresholds = metrics.roc_curve(labels[test],probas_[:,1])
mean_tpr += interp(mean_fpr,fpr,tpr)
mean_tpr[0] = 0.0
roc_auc = metrics.auc(fpr,tpr)
plt.plot(fpr,tpr,lw=1,label='ROC fold %d (area = %0.2f)' % (i,roc_auc))
plt.plot([0,1],[0,1],'--',color=(0.6,0.6,0.6),label='Luck')
mean_tpr /= len(cv)
mean_tpr[-1] = 1.0
mean_auc = metrics.auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--',
label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.savefig('auc_sent.png')
imdb_success_predictor.py 文件源码
项目:Movie-Success-Predictor
作者: Blueteak
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def test_classifier(clf, X, Y, loc):
folds = StratifiedKFold(Y, 5)
mean_tpr = 0.0
mean_fpr = numpy.linspace(0, 1, 100)
aucs = []
for i, (train, test) in enumerate(folds):
clf.fit(X[train], Y[train])
prediction = clf.predict_proba(X[test])
aucs.append(roc_auc_score(Y[test], prediction[:, 1]))
false_positive_rate, true_positive_rate, thresholds = roc_curve(Y[test], prediction[:, 1])
mean_tpr += interp(mean_fpr, false_positive_rate, true_positive_rate)
mean_tpr[0] = 0.0
roc_auc = auc(false_positive_rate, true_positive_rate)
plt.plot(false_positive_rate, true_positive_rate, lw=1,
label='ROC fold %d (area = %0.2f)' % ( i, roc_auc))
plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Luck')
mean_tpr /= len(folds)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--',
label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.title('Receiver Operating Characteristic')
plt.xlim([0,1])
plt.ylim([0,1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.legend(loc='lower right')
plt.show()
plt.savefig('plots/'+loc+'/'+clf.__class__.__name__+'.png')
plt.clf()
print clf.__class__.__name__, aucs, numpy.mean(aucs)
def classify_with_cross_validation(X, y, clf, n_folds=5):
cv_matrices = []
cv_measures = collections.defaultdict(list) # FIXME: use collections.OrderedDict too
logging.info("classifying and predicting with cross validation")
skf = cross_validation.StratifiedKFold(y, n_folds=n_folds)
for train_indices, test_indices in skf:
X_train = X[train_indices]
X_test = X[test_indices]
y_train = y[train_indices]
y_test = y[test_indices]
clf.fit(X_train, y_train)
y_predicted = clf.predict(X_test)
confusion_matrix = metrics.confusion_matrix(y_test, y_predicted).flatten()
cv_matrices.append(confusion_matrix)
for measure_name, measure_value in calculate_measures(*confusion_matrix).items():
cv_measures[measure_name].append(measure_value)
for measure_name, measure_values in cv_measures.items():
mean = np.mean(measure_values)
delta = np.std(measure_values) * 1.96 / math.sqrt(n_folds) # 95% of confidence
cv_measures[measure_name] = (mean, delta)
return cv_measures
# noinspection PyPep8Naming
def __init__(self, X, y, Xstatic=[], ystatic=[], nfolds=5, score='r2', classifier=RegressorWrapper, random_state=None):
self.nfolds = nfolds
self.score = score
# self.X = np.array(X)
self.X = X
self.Xstatic = Xstatic
self.le = preprocessing.LabelEncoder().fit(y)
self.y = self.le.transform(y)
if len(ystatic) > 0:
self.ystatic = self.le.transform(ystatic)
else:
self.ystatic = []
self.test_y = self.y
self.create_classifier = classifier
self.kfolds = cross_validation.StratifiedKFold(y, n_folds=nfolds, shuffle=True, random_state=random_state)
def __init__(self, X, y, Xstatic=[], ystatic=[], nfolds=5, score='macrof1', classifier=ClassifierWrapper, random_state=None):
self.nfolds = nfolds
self.score = score
# self.X = np.array(X)
self.X = X
self.Xstatic = Xstatic
self.le = preprocessing.LabelEncoder().fit(y)
self.y = self.le.transform(y)
if len(ystatic) > 0:
self.ystatic = self.le.transform(ystatic)
else:
self.ystatic = []
self.test_y = self.y
self.create_classifier = classifier
self.kfolds = cross_validation.StratifiedKFold(y, n_folds=nfolds, shuffle=True, random_state=random_state)
def cross_predict(feat, f_name, X=X, y=y):
if os.name == 'nt':
n_jobs = 1
else:
n_jobs = -1
# ????
# clf_1 = MultinomialNB(alpha=5)
clf_2 = LinearSVC(C=0.02)
# ???? (CV)
# This cross-validation object is a merge of StratifiedKFold and ShuffleSplit,
# which returns stratified randomized folds. The folds are made by preserving
# the percentage of samples for each class.
#
# Note: like the ShuffleSplit strategy, stratified random splits do not guarantee
# that all folds will be different, although this is still
# very likely for sizeable datasets.
#
# Pass this cv to cross_val_predict will raise
# ValueError:cross_val_predict only works for partitions
#
# ? cv ?????? fold ? fold ????????
# cv = cross_validation.StratifiedShuffleSplit(y, test_size=0.2, random_state=42)
# This cross-validation object is a variation of KFold that returns stratified folds.
# The folds are made by preserving the percentage of samples for each class.
cv = cross_validation.StratifiedKFold(y, n_folds=5, random_state=42)
model = Pipeline([('feat', feat), ('clf', clf_2)])
t0 = time()
y_pred = cross_validation.cross_val_predict(model, X=X, y=y, n_jobs=n_jobs, cv=cv)
t = time() - t0
print("=" * 20, f_name, "=" * 20)
print("time cost: {}".format(t))
# print("y_predict: {}".format(y_pred))
print()
print('confusion matrix:\n', confusion_matrix(y, y_pred))
print()
print('\t\taccuracy: {}'.format(accuracy_score(y, y_pred)))
print()
print("\t\tclassification report")
print("-" * 52)
print(classification_report(y, y_pred))
# ??
# ???? (tfidf: baseline feature)
def make_mf_lr(X ,y, clf, X_test, n_round=3):
n = X.shape[0]
'''
Fit metafeature by @clf and get prediction for test. Assumed that @clf -- regressor
'''
print clf
mf_tr = np.zeros(X.shape[0])
mf_te = np.zeros(X_test.shape[0])
for i in range(n_round):
skf = StratifiedKFold(y, n_folds=2, shuffle=True, random_state=42+i*1000)
for ind_tr, ind_te in skf:
X_tr = X[ind_tr]
X_te = X[ind_te]
# print('X_tr shape',X_tr.shape)
# print('X_te shape',X_te.shape)
y_tr = y[ind_tr]
y_te = y[ind_te]
clf.fit(X_tr, y_tr)
mf_tr[ind_te] += clf.predict_proba(X_te)[:,1]
mf_te += clf.predict_proba(X_test)[:,1]*0.5
y_pred = clf.predict_proba(X_te)[:,1]
score = roc_auc_score(y_te, y_pred)
print 'pred[{}] score:{}'.format(i, score)
return (mf_tr / n_round, mf_te / n_round)
def make_mf_lsvc(X ,y, clf, X_test, n_round=3):
n = X.shape[0]
'''
Fit metafeature by @clf and get prediction for test. Assumed that @clf -- regressor
'''
print clf
mf_tr = np.zeros(X.shape[0])
mf_te = np.zeros(X_test.shape[0])
for i in range(n_round):
skf = StratifiedKFold(y, n_folds=2, shuffle=True, random_state=42+i*1000)
for ind_tr, ind_te in skf:
X_tr = X[ind_tr]
X_te = X[ind_te]
# print('X_tr shape',X_tr.shape)
# print('X_te shape',X_te.shape)
y_tr = y[ind_tr]
y_te = y[ind_te]
clf.fit(X_tr, y_tr)
mf_tr[ind_te] += clf.decision_function(X_te)
mf_te += clf.decision_function(X_test)*0.5
y_pred = clf.decision_function(X_te)
score = roc_auc_score(y_te, y_pred)
print 'pred[{}] score:{}'.format(i, score)
return (mf_tr / n_round, mf_te / n_round)
def make_mf_nn(X ,y, X_test, n_round=3):
n = X.shape[0]
'''
Fit metafeature by @clf and get prediction for test. Assumed that @clf -- regressor
'''
from kaggler.online_model.ftrl import FTRL
mf_tr = np.zeros(X.shape[0])
mf_te = np.zeros(X_test.shape[0])
for i in range(n_round):
skf = StratifiedKFold(y, n_folds=2, shuffle=True, random_state=42+i*1000)
for ind_tr, ind_te in skf:
clf = build_model(X)
X_tr = [X[:,0][ind_tr],X[:,1][ind_tr]]
X_te = [X[:,0][ind_te],X[:,1][ind_te]]
# print('X_tr shape',X_tr.shape)
# print('X_te shape',X_te.shape)
y_tr = y[ind_tr]
y_te = y[ind_te]
clf.fit(X_tr, y_tr,nb_epoch=2,batch_size=128,validation_data=[X_te,y_te])
mf_tr[ind_te] += clf.predict(X_te).ravel()
mf_te += clf.predict([X_test[:,0],X_test[:,1]]).ravel()*0.5
y_pred = clf.predict(X_te).ravel()
score = roc_auc_score(y_te, y_pred)
print 'pred[{}] score:{}'.format(i, score)
return (mf_tr / n_round, mf_te / n_round)