def run_exp_train_cv(crf, feat_dirs, target_label, n_folds=5, n_jobs=-1):
"""
Run cross-validated experiment on training data
"""
# Collect data for running CRF classifier
train_dir = join(LOCAL_DIR, 'train')
true_iob_dir = join(train_dir, 'iob')
X = collect_features(true_iob_dir, *feat_dirs)
labels_fname = join(train_dir, 'train_labels.pkl')
labels = read_labels(labels_fname)
y_true = labels[target_label]
folds_fname = join(train_dir, 'folds.pkl')
folds = read_folds(folds_fname, n_folds)
# Predict]
y_pred = cross_val_predict(crf, X, y_true, cv=folds, verbose=2, n_jobs=n_jobs)
print(flat_classification_report(y_true, y_pred, digits=3, labels=('B', 'I')))
return y_pred
python类cross_val_predict()的实例源码
def predict(self, X, y):
"""
Returns a generator containing the predictions for each of the
internal models (using cross_val_predict and a CV=12).
Parameters
----------
X : ndarray or DataFrame of shape n x m
A matrix of n instances with m features
y : ndarray or Series of length n
An array or series of target or class values
kwargs: dict
keyword arguments passed to Scikit-Learn API.
"""
for model in self.models:
yield cvp(model, X, y, cv=12)
def test_cross_val_predict():
# Make sure it works in cross_val_predict for multiclass.
X, y = load_iris(return_X_y=True)
y = LabelBinarizer().fit_transform(y)
X = StandardScaler().fit_transform(X)
mlp = MLPClassifier(n_epochs=10,
solver_kwargs={'learning_rate': 0.05},
random_state=4567).fit(X, y)
cv = KFold(n_splits=4, random_state=457, shuffle=True)
y_oos = cross_val_predict(mlp, X, y, cv=cv, method='predict_proba')
auc = roc_auc_score(y, y_oos, average=None)
assert np.all(auc >= 0.96)
def cv_SVR( xM, yV, svr_params, n_splits = 5, n_jobs = -1, grid_std = None, graph = True, shuffle = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = svm.SVR( **svr_params)
kf_n_c = model_selection.KFold( n_splits=n_splits, shuffle=shuffle)
kf_n = kf5_ext_c.split( xM)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def _cv_r0( method, xM, yV, alpha, n_splits = 5, n_jobs = -1, grid_std = None, graph = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = getattr( linear_model, method)( alpha = alpha)
kf_n_c = model_selection.KFold( n_splits = n_splits, shuffle=True)
kf_n = kf5_ext_c.split( xM)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cv( method, xM, yV, alpha, n_splits = 5, n_jobs = -1, grid_std = None, graph = True, shuffle = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = getattr( linear_model, method)( alpha = alpha)
kf_n_c = model_selection.KFold( n_splits=n_splits, shuffle=shuffle)
kf_n = kf5_ext_c.split( xM)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cvLOO( method, xM, yV, alpha, n_jobs = -1, grid_std = None, graph = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
n_splits = xM.shape[0]
# print(xM.shape, yV.shape)
clf = getattr( linear_model, method)( alpha = alpha)
kf_n = model_selection.KFold( xM.shape[0], n_splits=n_splits)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cv_SVR( xM, yV, svr_params, n_splits = 5, n_jobs = -1, grid_std = None, graph = True, shuffle = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = svm.SVR( **svr_params)
kf_n_c = model_selection.KFold( n_splits=n_splits, shuffle=shuffle)
kf_n = kf5_ext_c.split( xM)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
kutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def _cv_r0( method, xM, yV, alpha, n_splits = 5, n_jobs = -1, grid_std = None, graph = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = getattr( linear_model, method)( alpha = alpha)
kf_n_c = model_selection.KFold( n_splits = n_splits, shuffle=True)
kf_n = kf5_ext_c.split( xM)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
kutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cv( method, xM, yV, alpha, n_splits = 5, n_jobs = -1, grid_std = None, graph = True, shuffle = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = getattr( linear_model, method)( alpha = alpha)
kf_n_c = model_selection.KFold( n_splits=n_splits, shuffle=shuffle)
kf_n = kf_n_c.split( xM)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
kutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cvLOO( method, xM, yV, alpha, n_jobs = -1, grid_std = None, graph = True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
n_splits = xM.shape[0]
# print(xM.shape, yV.shape)
clf = getattr( linear_model, method)( alpha = alpha)
kf_n = model_selection.KFold( xM.shape[0], n_splits=n_splits)
yV_pred = model_selection.cross_val_predict( clf, xM, yV, cv = kf_n, n_jobs = n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
kutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cv_SVR(xM, yV, svr_params, n_folds=5, n_jobs=-1, grid_std=None, graph=True, shuffle=True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = svm.SVR(**svr_params)
kf_n_c = model_selection.KFold(n_splits=n_folds, shuffle=True)
kf_n = kf_n_c.split(xM)
yV_pred = model_selection.cross_val_predict(
clf, xM, yV.A1, cv=kf_n, n_jobs=n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show(yV, yV_pred, grid_std=grid_std)
return yV_pred
def _cv_r0(method, xM, yV, alpha, n_folds=5, n_jobs=-1, grid_std=None, graph=True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
print(xM.shape, yV.shape)
clf = getattr(linear_model, method)(alpha=alpha)
kf_n_c = model_selection.KFold(n_splits=n_folds, shuffle=True)
kf_n = kf_n_c.split(xM)
yV_pred = model_selection.cross_val_predict(
clf, xM, yV, cv=kf_n, n_jobs=n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show(yV, yV_pred, grid_std=grid_std)
return yV_pred
def cv(method, xM, yV, alpha, n_folds=5, n_jobs=-1, grid_std=None, graph=True, shuffle=True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
Return
--------
yV_pred
"""
print(xM.shape, yV.shape)
clf = getattr(linear_model, method)(alpha=alpha)
kf_n_c = model_selection.KFold(n_splits=n_folds, shuffle=True)
kf_n = kf_n_c.split(xM)
yV_pred = model_selection.cross_val_predict(
clf, xM, yV, cv=kf_n, n_jobs=n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show(yV, yV_pred, grid_std=grid_std)
return yV_pred
def _cv_LOO_r0(method, xM, yV, alpha, n_jobs=-1, grid_std=None, graph=True):
"""
method can be 'Ridge', 'Lasso'
cross validation is performed so as to generate prediction output for all input molecules
"""
n_folds = xM.shape[0]
print(xM.shape, yV.shape)
clf = getattr(linear_model, method)(alpha=alpha)
# print("Note - shuffling is not applied because of LOO.")
kf_n_c = model_selection.KFold(n_splits=n_folds)
kf_n = kf_n_c.split(xM)
yV_pred = model_selection.cross_val_predict(
clf, xM, yV, cv=kf_n, n_jobs=n_jobs)
if graph:
print('The prediction output using cross-validation is given by:')
jutil.cv_show(yV, yV_pred, grid_std=grid_std)
return yV_pred
def cv_Ridge_BIKE(A_list, yV, XX=None, alpha=0.5, n_folds=5, n_jobs=-1, grid_std=None):
"""
Older version than cv_Ridge_BIKE
"""
clf = binary_model.BIKE_Ridge(A_list, XX, alpha=alpha)
ln = A_list[0].shape[0] # ls is the number of molecules.
kf_n_c = model_selection.KFold(n_splits=n_folds, shuffle=True)
kf_n = kf_n_c.split(A_list)
AX_idx = np.array([list(range(ln))]).T
yV_pred = model_selection.cross_val_predict(
clf, AX_idx, yV, cv=kf_n, n_jobs=n_jobs)
print('The prediction output using cross-validation is given by:')
jutil.cv_show(yV, yV_pred, grid_std=grid_std)
return yV_pred
def test_cross_val_score_predict_labels():
# Check if ValueError (when labels is None) propagates to cross_val_score
# and cross_val_predict
# And also check if labels is correctly passed to the cv object
X, y = make_classification(n_samples=20, n_classes=2, random_state=0)
clf = SVC(kernel="linear")
label_cvs = [LeaveOneLabelOut(), LeavePLabelOut(2), LabelKFold(),
LabelShuffleSplit()]
for cv in label_cvs:
assert_raise_message(ValueError,
"The labels parameter should not be None",
cross_val_score, estimator=clf, X=X, y=y, cv=cv)
assert_raise_message(ValueError,
"The labels parameter should not be None",
cross_val_predict, estimator=clf, X=X, y=y, cv=cv)
def predictKFoldKNN(X, y, K=10, kfold=10, selectKBest=0):
"""
Classifies the data using K-nearest neighbors and k-fold CV
:param X: The list of feature vectors
:type X: list
:param y: The list of labels corresponding to the feature vectors
:type y: list
:param K: The number of nearest neighbors to consider in classification
:type K: int
:param kfold: The number of folds in the CV
:type kfold: int
:param selectKBest: The number of best features to select
:type selectKBest: int
:return: An array of predicted classes
"""
try:
# Prepare data
X, y = numpy.array(X), numpy.array(y)
# Define classifier
clf = neighbors.KNeighborsClassifier(n_neighbors=K)
# Select K Best features if enabled
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
predicted = cross_val_predict(clf, X_new, y, cv=kfold).tolist()
except Exception as e:
prettyPrintError(e)
return []
return predicted
def predictKFoldSVMSSK(X, y, kfold=10, subseqLength=3, selectKBest=0):
"""Classifies the data using Support vector machines with the SSK kernel and k-fold CV
:param X: The list of text documents containing traces
:type X: list
:param y: The labels of documents in 'X'
:type y: list
:param kfold: The number of folds
:type kfold: int (default: 10)
:param subseqLength: Length of subsequence used by the SSK
:type subseqLength: int (default: 3)
:param selectKBest: The number of best features to select
:type selectKBest: int
:return: An array of predicted classes
"""
try:
predicted = []
# Retrieve Gram Matrix from string kernel
if verboseON():
prettyPrint("Generating Gram Matrix from documents", "debug")
X_gram = string_kernel(X, X)
y = numpy.array(y)
# Define classifier
clf = svm.SVC(kernel="precomputed")
X_gram_new = SelectKBest(chi2, k=selectKBest).fit_transform(X_gram, y) if selectKBest > 0 else X_gram
prettyPrint("Performing %s-fold CV on the %s best features" % (kfold, selectKBest))
predicted = cross_val_predict(clf, X_gram_new, y, cv=kfold).tolist()
except Exception as e:
prettyPrintError(e)
return []
return predicted
def predictKFoldSVM(X, y, kernel="linear", C=1, selectKBest=0, kfold=10):
"""
Classifies the data using Support vector machines and k-fold CV
:param X: The matrix of feature vectors
:type X: list
:param y: The vector containing the labels corresponding to feature vectors
:type y: list
:param kernel: The kernel used to elevate data into higher dimensionalities
:type kernel: str
:param C: The penalty parameter of the error term
:type C: int
:param selectKBest: The number of best features to select
:type selectKBest: int
:param kfold: The number of folds to use in K-fold CV
:type kfold: int
:return: A list of predicted labels across the k-folds
"""
try:
# Prepare data
X, y = numpy.array(X), numpy.array(y)
# Define classifier
clf = svm.SVC(kernel=kernel, C=C)
# Select K Best features if enabled
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
predicted = cross_val_predict(clf, X_new, y, cv=kfold).tolist()
except Exception as e:
prettyPrintError(e)
return []
return predicted
def predictKFoldRandomForest(X, y, estimators=10, criterion="gini", maxdepth=None, selectKBest=0, kfold=10):
"""
Classifies the data using decision trees and k-fold CV
:param X: The matrix of feature vectors
:type X: list
:param y: The vector containing labels corresponding to the feature vectors
:type y: list
:param estimators: The number of random trees to use in classification
:type estimators: int
:param criterion: The splitting criterion employed by the decision tree
:type criterion: str
:param splitter: The method used to split the data
:type splitter: str
:param maxDepth: The maximum depth the tree is allowed to grow
:type maxDepth: int
:param selectKBest: The number of best features to select
:type selectKBest: int
:param kfold: The number of folds to use in K-fold CV
:type kfold: int
:return: A list of predicted labels across the k-folds
"""
try:
# Prepare data
X, y = numpy.array(X), numpy.array(y)
# Define classifier
clf = ensemble.RandomForestClassifier(n_estimators=estimators, criterion=criterion, max_depth=maxdepth)
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
predicted = cross_val_predict(clf, X_new, y, cv=kfold).tolist()
except Exception as e:
prettyPrintError(e)
return []
return predicted
def test_cross_val_predict():
"""Make sure it works in cross_val_predict."""
X, y = load_iris(return_X_y=True)
X = StandardScaler().fit_transform(X)
clf = FMClassifier(rank=2, solver='L-BFGS-B', random_state=4567).fit(X, y)
cv = KFold(n_splits=4, random_state=457, shuffle=True)
y_oos = cross_val_predict(clf, X, y, cv=cv, method='predict')
acc = accuracy_score(y, y_oos)
assert acc >= 0.90, "accuracy is too low for iris in cross_val_predict!"
def cv_Ridge_BIKE( A_list, yV, XX = None, alpha = 0.5, n_splits = 5, n_jobs = -1, grid_std = None):
clf = binary_model.BIKE_Ridge( A_list, XX, alpha = alpha)
ln = A_list[0].shape[0] # ls is the number of molecules.
kf_n_c = model_selection.KFold( n_splits = n_splits, shuffle=True)
kf_n = kf5_ext_c.split( A_list[0])
AX_idx = np.array([list(range( ln))]).T
yV_pred = model_selection.cross_val_predict( clf, AX_idx, yV, cv = kf_n, n_jobs = n_jobs)
print('The prediction output using cross-validation is given by:')
jutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def cv_Ridge_BIKE( A_list, yV, XX = None, alpha = 0.5, n_splits = 5, n_jobs = -1, grid_std = None):
clf = binary_model.BIKE_Ridge( A_list, XX, alpha = alpha)
ln = A_list[0].shape[0] # ls is the number of molecules.
kf_n_c = model_selection.KFold( n_splits = n_splits, shuffle=True)
kf_n = kf5_ext_c.split( A_list[0])
AX_idx = np.array([list(range( ln))]).T
yV_pred = model_selection.cross_val_predict( clf, AX_idx, yV, cv = kf_n, n_jobs = n_jobs)
print('The prediction output using cross-validation is given by:')
kutil.cv_show( yV, yV_pred, grid_std = grid_std)
return yV_pred
def _generate_cross_val_predict_test(X, y, est, pd_est, must_match):
def test(self):
self.assertEqual(
hasattr(est, 'predict'),
hasattr(pd_est, 'predict'))
if not hasattr(est, 'predict'):
return
pd_y_hat = pd_cross_val_predict(pd_est, X, y)
self.assertTrue(isinstance(pd_y_hat, pd.Series))
self.assertTrue(pd_y_hat.index.equals(X.index))
if must_match:
y_hat = cross_val_predict(est, X.as_matrix(), y.values)
np.testing.assert_allclose(pd_y_hat, y_hat)
return test
def test_cross_val_predict():
boston = load_boston()
X, y = boston.data, boston.target
cv = KFold()
est = Ridge()
# Naive loop (should be same as cross_val_predict):
preds2 = np.zeros_like(y)
for train, test in cv.split(X, y):
est.fit(X[train], y[train])
preds2[test] = est.predict(X[test])
preds = cross_val_predict(est, X, y, cv=cv)
assert_array_almost_equal(preds, preds2)
preds = cross_val_predict(est, X, y)
assert_equal(len(preds), len(y))
cv = LeaveOneOut()
preds = cross_val_predict(est, X, y, cv=cv)
assert_equal(len(preds), len(y))
Xsp = X.copy()
Xsp *= (Xsp > np.median(Xsp))
Xsp = coo_matrix(Xsp)
preds = cross_val_predict(est, Xsp, y)
assert_array_almost_equal(len(preds), len(y))
preds = cross_val_predict(KMeans(), X)
assert_equal(len(preds), len(y))
class BadCV():
def split(self, X, y=None, labels=None):
for i in range(4):
yield np.array([0, 1, 2, 3]), np.array([4, 5, 6, 7, 8])
assert_raises(ValueError, cross_val_predict, est, X, y, cv=BadCV())
def test_cross_val_predict_input_types():
iris = load_iris()
X, y = iris.data, iris.target
X_sparse = coo_matrix(X)
multioutput_y = np.column_stack([y, y[::-1]])
clf = Ridge(fit_intercept=False, random_state=0)
# 3 fold cv is used --> atleast 3 samples per class
# Smoke test
predictions = cross_val_predict(clf, X, y)
assert_equal(predictions.shape, (150,))
# test with multioutput y
predictions = cross_val_predict(clf, X_sparse, multioutput_y)
assert_equal(predictions.shape, (150, 2))
predictions = cross_val_predict(clf, X_sparse, y)
assert_array_equal(predictions.shape, (150,))
# test with multioutput y
predictions = cross_val_predict(clf, X_sparse, multioutput_y)
assert_array_equal(predictions.shape, (150, 2))
# test with X and y as list
list_check = lambda x: isinstance(x, list)
clf = CheckingClassifier(check_X=list_check)
predictions = cross_val_predict(clf, X.tolist(), y.tolist())
clf = CheckingClassifier(check_y=list_check)
predictions = cross_val_predict(clf, X, y.tolist())
# test with 3d X and
X_3d = X[:, :, np.newaxis]
check_3d = lambda x: x.ndim == 3
clf = CheckingClassifier(check_X=check_3d)
predictions = cross_val_predict(clf, X_3d, y)
assert_array_equal(predictions.shape, (150,))
def test_cross_val_predict_pandas():
# check cross_val_score doesn't destroy pandas dataframe
types = [(MockDataFrame, MockDataFrame)]
try:
from pandas import Series, DataFrame
types.append((Series, DataFrame))
except ImportError:
pass
for TargetType, InputFeatureType in types:
# X dataframe, y series
X_df, y_ser = InputFeatureType(X), TargetType(y2)
check_df = lambda x: isinstance(x, InputFeatureType)
check_series = lambda x: isinstance(x, TargetType)
clf = CheckingClassifier(check_X=check_df, check_y=check_series)
cross_val_predict(clf, X_df, y_ser)
def test_cross_val_predict_sparse_prediction():
# check that cross_val_predict gives same result for sparse and dense input
X, y = make_multilabel_classification(n_classes=2, n_labels=1,
allow_unlabeled=False,
return_indicator=True,
random_state=1)
X_sparse = csr_matrix(X)
y_sparse = csr_matrix(y)
classif = OneVsRestClassifier(SVC(kernel='linear'))
preds = cross_val_predict(classif, X, y, cv=10)
preds_sparse = cross_val_predict(classif, X_sparse, y_sparse, cv=10)
preds_sparse = preds_sparse.toarray()
assert_array_almost_equal(preds_sparse, preds)
def fit(self, df_train, df_test):
"""
Computes the drift between the two datasets
Parameters
----------
df_train : pandas dataframe of shape = (n_train, p)
The train set
df_test : pandas dataframe of shape = (n_test, p)
The test set
Returns
-------
self : object
Returns self.
"""
df_train["target"] = 0
df_test["target"] = 1
self.__target = pd.concat((df_train.target, df_test.target),
ignore_index=True)
if self.stratify:
self.__cv = StratifiedKFold(n_splits=self.n_folds,
shuffle=True,
random_state=self.random_state)
else:
self.__cv = KFold(n_splits=self.n_folds,
shuffle=True,
random_state=self.random_state)
X_tmp = pd.concat((df_train, df_test),
ignore_index=True).drop(['target'], axis=1)
self.__pred = cross_val_predict(estimator=self.estimator,
X=X_tmp,
y=self.__target,
cv=self.__cv,
method="predict_proba")[:,1]
del df_train["target"]
del df_test["target"]
self.__fitOK = True
return self