def get_feature_selection_model_from_name(type_of_estimator, model_name):
model_map = {
'classifier': {
'SelectFromModel': SelectFromModel(RandomForestClassifier(n_jobs=-1, max_depth=10, n_estimators=15), threshold='20*mean'),
'RFECV': RFECV(estimator=RandomForestClassifier(n_jobs=-1), step=0.1),
'GenericUnivariateSelect': GenericUnivariateSelect(),
'KeepAll': 'KeepAll'
},
'regressor': {
'SelectFromModel': SelectFromModel(RandomForestRegressor(n_jobs=-1, max_depth=10, n_estimators=15), threshold='0.7*mean'),
'RFECV': RFECV(estimator=RandomForestRegressor(n_jobs=-1), step=0.1),
'GenericUnivariateSelect': GenericUnivariateSelect(),
'KeepAll': 'KeepAll'
}
}
return model_map[type_of_estimator][model_name]
python类RFECV的实例源码
def get_feature_selection_model_from_name(type_of_estimator, model_name):
model_map = {
'classifier': {
'SelectFromModel': SelectFromModel(RandomForestClassifier(n_jobs=-1, max_depth=10, n_estimators=15), threshold='20*mean'),
'RFECV': RFECV(estimator=RandomForestClassifier(n_jobs=-1), step=0.1),
'GenericUnivariateSelect': GenericUnivariateSelect(),
'RandomizedSparse': RandomizedLogisticRegression(),
'KeepAll': 'KeepAll'
},
'regressor': {
'SelectFromModel': SelectFromModel(RandomForestRegressor(n_jobs=-1, max_depth=10, n_estimators=15), threshold='0.7*mean'),
'RFECV': RFECV(estimator=RandomForestRegressor(n_jobs=-1), step=0.1),
'GenericUnivariateSelect': GenericUnivariateSelect(),
'RandomizedSparse': RandomizedLasso(),
'KeepAll': 'KeepAll'
}
}
return model_map[type_of_estimator][model_name]
def test_RFECV():
'''
test the method of RFECV
:return: None
'''
iris=load_iris()
X=iris.data
y=iris.target
estimator=LinearSVC()
selector=RFECV(estimator=estimator,cv=3)
selector.fit(X,y)
print("N_features %s"%selector.n_features_)
print("Support is %s"%selector.support_)
print("Ranking %s"%selector.ranking_)
print("Grid Scores %s"%selector.grid_scores_)
def recursive_feature_elimination_cv(self, step=1, inplace=False):
"""A method to implement recursive feature elimination on the model
with cross-validation(CV). At each step, features are ranked as per
the algorithm used and lowest ranked features are removed,
as specified by the step argument. At each step, the CV score is
determined using the scoring metric specified in the model. The set
of features with highest cross validation scores is then chosen.
Parameters
__________
step : int or float, default=1
If int, then step corresponds to the number of features to remove
at each iteration.
If float and within (0.0, 1.0), then step corresponds to the
percentage (rounded down) of features to remove at each
iteration.
If float and greater than one, then integral part will be
considered as an integer input
inplace : bool, default=False
If True, the predictors of the class are modified to those
selected by the RFECV procedure.
Returns
_______
selected : pandas series
A series object containing the selected features as
index and their rank in selection as values
"""
rfecv = RFECV(
self.alg, step=step,cv=self.cv_folds,
scoring=self.scoring_metric,n_jobs=-1
)
rfecv.fit(
self.datablock.train[self.predictors],
self.datablock.train[self.datablock.target]
)
if step>1:
min_nfeat = (len(self.predictors)
- step*(len(rfecv.grid_scores_)-1))
plt.xlabel("Number of features selected")
plt.ylabel("Cross validation score")
plt.plot(
range(min_nfeat, len(self.predictors)+1, step),
rfecv.grid_scores_
)
plt.show(block=False)
ranks = pd.Series(rfecv.ranking_, index=self.predictors)
selected = ranks.loc[rfecv.support_]
if inplace:
self.set_predictors(selected.index.tolist())
return ranks
def selectFeatures (clf, X, Y):
# Create the RFE object and compute a cross-validated score.
# The "accuracy" scoring is proportional to the number of correct
# classifications
rfecv = RFECV(estimator=clf, step=1, cv=StratifiedKFold(Y, 5),
scoring='accuracy')
rfecv.fit(X, Y)
lst = rfecv.get_support()
indices = find(lst, True)
return X[:, indices], indices
def selectFeatures (clf, X, Y):
# Create the RFE object and compute a cross-validated score.
# The "accuracy" scoring is proportional to the number of correct
# classifications
rfecv = RFECV(estimator=clf, step=1, cv=StratifiedKFold(Y, 5),
scoring='accuracy')
rfecv.fit(X, Y)
lst = rfecv.get_support()
indices = find(lst, True)
return X[:, indices]
def featureSelect(useFeature,trueSet,falseSet):
# load data and split
X_true = []
for dn in trueSet:
fin = open("./learn/data/"+useFeature+"_"+dn+".pkl","rb")
X_true.append(pickle.load(fin))
fin.close()
X_true = np.vstack(X_true)
print(X_true.shape)
X_false = []
for dn in falseSet:
fin = open("./learn/data/"+useFeature+"_"+dn+".pkl","rb")
X_false.append(pickle.load(fin))
fin.close()
X_false = np.vstack(X_false)
print(X_false.shape)
test_size = 0.5
X_true_train,X_true_test = train_test_split(X_true ,test_size=test_size)
X_false_train, X_false_test = train_test_split(X_false ,train_size=len(X_true_train),test_size=len(X_true_test))
print(X_true_train.shape,X_true_test.shape)
print(X_false_train.shape,X_false_test.shape)
X = np.vstack([X_true_train,X_false_train])
X_ = np.vstack([X_true_test,X_false_test])
Y = [1]*len(X_true_train)+[0]*len(X_false_train)
Y_ = [1]*len(X_true_test)+[0]*len(X_false_test)
X,Y = shuffle(X,Y)
X_,Y_ = shuffle(X_,Y_)
featNames = ml_feature_name.getFeatureName(useFeature)
# clf = Lasso(alpha=0.01)
clf = LinearSVC(C=0.1)
rfe = RFECV(estimator = clf , step = 1,cv = 3,verbose = 1)
rfe.fit(X,Y)
print("best is {0} features".format(rfe.n_features_))
# ranking = rfe.ranking_;
# fn = list(zip(ranking,featNames))
# fn.sort()
# print("\n".join([str(v) for v in fn][:20]))
ss = rfe.grid_scores_
plt.plot(range(len(ss)),ss)
plt.savefig("./learn/feature/"+useFeature+"_fselect.png")
plt.show()
Xs = rfe.transform(X)
Xs_ = rfe.transform(X_)
clf.fit(Xs,Y)
Yp = clf.predict(Xs)
Yp_ = clf.predict(Xs_)
print(classification_report(Y,Yp))
print(classification_report(Y_,Yp_))
clf.fit(X,Y)
Yp = clf.predict(X)
Yp_ = clf.predict(X_)
print(classification_report(Y,Yp))
print(classification_report(Y_,Yp_))
print(X.shape,Xs.shape)
def adopt(self, dfe, interpreted=None):
models = []
# about scoring, please see following document
# http://scikit-learn.org/stable/modules/model_evaluation.html#common-cases-predefined-values
scoring = "accuracy"
# todo: now, text and datetime colum is ignored
for t in (FType.text, FType.datetime):
columns = dfe.get_columns(t, include_target=False)
dfe.df.drop(columns, inplace=True, axis=1)
dfe.sync()
if dfe.get_target_ftype() == FType.categorical:
#models = [RandomForestClassifier(), SVC(kernel="linear")]
models = [RandomForestClassifier()]
if self.is_binary_classification(dfe):
scoring = "f1"
else:
# see reference about f1 score
# http://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html#sklearn.metrics.f1_score
scoring = "f1_micro" # if prediction does not occur to some label, macro is too worse to evaluate
elif dfe.get_target_ftype() == FType.numerical:
# About the model to select the feature, please refer
# http://scikit-learn.org/stable/modules/feature_selection.html
models = [Lasso(alpha=.1), RandomForestRegressor()]
scoring = "r2"
else:
raise Exception("Target type is None or un-predictable type.")
features = dfe.get_features()
target = dfe.get_target()
best_rfecv = None
feature_masks = []
for m in models:
rfecv = RFECV(estimator=m, step=1, cv=self.cv_count, scoring=scoring, n_jobs=self.n_jobs)
rfecv.fit(features, target)
feature_masks.append(rfecv.support_)
selected_mask = []
if len(feature_masks) < 2:
selected_mask = feature_masks[0]
else:
selected_mask = np.logical_and(*feature_masks) # take the feature that some models take
eliminates = features.columns[np.logical_not(selected_mask)]
dfe.df.drop(eliminates, inplace=True, axis=1)
dfe.sync()
selected = features.columns[selected_mask].tolist()
ss = self.a2t(selected)
self.description = {
"ja": "??{}??????????????????????????????".format(ss),
"en": "Columns {} are useful to predict. I'll use these to make model.".format(ss)
}
return True