def feature_selection(self):
# use .csv replace .mat
# vector = mat.loadmat('model\\vector.mat')
# vector = vector['data']
with open('model\\happy_other.csv', 'r') as f:
reader = csv.reader(f)
vector_happy = []
for line in reader:
for i in range(len(line) - 1):
line[i] = float(line[i])
vector_happy.append(line)
vector_happy = np.array(vector_happy)
print(vector_happy)
with open('model\\normal_sad.csv', 'r') as f:
reader = csv.reader(f)
vector_sad = []
for line in reader:
for i in range(len(line) - 1):
line[i] = float(line[i])
vector_sad.append(line)
vector_sad = np.array(vector_sad)
self.train_vector_happy = vector_happy[:, 0:28]
self.target_vector_happy = vector_happy[:, 28:29]
self.train_vector_sad = vector_sad[:, 0:28]
self.target_vector_sad = vector_sad[:, 28:29]
clf = ExtraTreesClassifier()
clf = clf.fit(self.train_vector_happy, self.target_vector_happy.ravel())
model = SelectFromModel(clf, threshold='1.25*mean', prefit=True)
joblib.dump(model, 'model\\vector_select.m')
self.ex_vector_happy = model.transform(self.train_vector_happy) # after extract
print(self.ex_vector_happy)
self.ex_vector_sad = model.transform(self.train_vector_sad) # after extract
python类SelectFromModel()的实例源码
def fit(self, X, y=None):
self.selector = get_feature_selection_model_from_name(self.type_of_estimator, self.feature_selection_model)
if self.selector == 'KeepAll':
if scipy.sparse.issparse(X):
num_cols = X.shape[0]
else:
num_cols = len(X[0])
self.support_mask = [True for col_idx in range(num_cols) ]
else:
if self.feature_selection_model == 'SelectFromModel':
num_cols = X.shape[1]
num_rows = X.shape[0]
if self.type_of_estimator == 'regressor':
self.estimator = RandomForestRegressor(n_jobs=-1, max_depth=10, n_estimators=15)
else:
self.estimator = RandomForestClassifier(n_jobs=-1, max_depth=10, n_estimators=15)
self.estimator.fit(X, y)
feature_importances = self.estimator.feature_importances_
# Two ways of doing feature selection
# 1. Any feature with a feature importance of at least 1/100th of our max feature
max_feature_importance = max(feature_importances)
threshold_by_relative_importance = 0.01 * max_feature_importance
# 2. 1/4 the number of rows (so 100 rows means 25 columns)
sorted_importances = sorted(feature_importances, reverse=True)
max_cols = int(num_rows * 0.25)
try:
threshold_by_max_cols = sorted_importances[max_cols]
except IndexError:
threshold_by_max_cols = sorted_importances[-1]
threshold = max(threshold_by_relative_importance, threshold_by_max_cols)
self.support_mask = [True if x > threshold else False for x in feature_importances]
else:
self.selector.fit(X, y)
self.support_mask = self.selector.get_support()
# Get a mask of which indices it is we want to keep
self.index_mask = [idx for idx, val in enumerate(self.support_mask) if val == True]
return self
def fit(self, X, y=None):
print('Performing feature selection')
self.selector = get_feature_selection_model_from_name(self.type_of_estimator, self.feature_selection_model)
if self.selector == 'KeepAll':
if scipy.sparse.issparse(X):
num_cols = X.shape[0]
else:
num_cols = len(X[0])
self.support_mask = [True for col_idx in range(num_cols) ]
else:
if self.feature_selection_model == 'SelectFromModel':
num_cols = X.shape[1]
num_rows = X.shape[0]
if self.type_of_estimator == 'regressor':
self.estimator = RandomForestRegressor(n_jobs=-1, max_depth=10, n_estimators=15)
else:
self.estimator = RandomForestClassifier(n_jobs=-1, max_depth=10, n_estimators=15)
self.estimator.fit(X, y)
feature_importances = self.estimator.feature_importances_
# Two ways of doing feature selection
# 1. Any feature with a feature importance of at least 1/100th of our max feature
max_feature_importance = max(feature_importances)
threshold_by_relative_importance = 0.01 * max_feature_importance
# 2. 1/4 the number of rows (so 100 rows means 25 columns)
sorted_importances = sorted(feature_importances, reverse=True)
max_cols = int(num_rows * 0.25)
try:
threshold_by_max_cols = sorted_importances[max_cols]
except IndexError:
threshold_by_max_cols = sorted_importances[-1]
threshold = max(threshold_by_relative_importance, threshold_by_max_cols)
self.support_mask = [True if x > threshold else False for x in feature_importances]
else:
self.selector.fit(X, y)
self.support_mask = self.selector.get_support()
# Get a mask of which indices it is we want to keep
self.index_mask = [idx for idx, val in enumerate(self.support_mask) if val == True]
return self
def train_sentence_classifier(self, pairtype):
self.text_clf = Pipeline([('vect', CountVectorizer(analyzer='char_wb', ngram_range=(7,20), min_df=0.2, max_df=0.5)),
#('vect', CountVectorizer(analyzer='word', ngram_range=(1,5), stop_words="english", min_df=0.1)),
# ('tfidf', TfidfTransformer(use_idf=True, norm="l2")),
#('tfidf', TfidfVectorizer(analyzer='char_wb', ngram_range=(6,20))),
#('clf', SGDClassifier(loss='hinge', penalty='l1', alpha=0.01, n_iter=5, random_state=42)),
#('clf', SGDClassifier())
#('clf', svm.SVC(kernel='rbf', C=10, verbose=True, tol=1e-5))
#('clf', RandomForestClassifier(n_estimators=10))
#('feature_selection', feature_selection.SelectFromModel(LinearSVC(penalty="l1"))),
('clf', MultinomialNB(alpha=0.1, fit_prior=False))
#('clf', DummyClassifier(strategy="constant", constant=True))
])
f, labels, sids = self.get_features(pairtype)
half_point = int(len(f)*0.5)
self.train_sentences = sids[:half_point]
"""ch2 = SelectKBest(chi2, k=20)
X_train = text_clf.named_steps["vect"].fit_transform(f[:half_point])
X_test = text_clf.named_steps["vect"].transform(f[half_point:])
X_train = ch2.fit_transform(X_train, labels[:half_point])
X_test = ch2.transform(X_test)
feature_names = text_clf.named_steps["vect"].get_feature_names()
feature_names = [feature_names[i] for i
in ch2.get_support(indices=True)]
# print feature_names"""
# train
text_clf = self.text_clf.fit(f[:half_point], labels[:half_point])
#save model
if not os.path.exists("models/kernel_models/" + pairtype + "_sentence_classifier/"):
os.makedirs("models/kernel_models/" + pairtype + "_sentence_classifier/")
logging.info("Training complete, saving to {}/{}/{}.pkl".format("models/kernel_models/",
pairtype + "_sentence_classifier/", pairtype))
joblib.dump(text_clf, "{}/{}/{}.pkl".format("models/kernel_models/",
pairtype + "_sentence_classifier/", pairtype))
# evaluate
pred = text_clf.predict(f[half_point:])
# print len(pred), sum(pred)
self.type_sentences[pairtype] = []
for ip, p in enumerate(pred):
if p:
self.type_sentences[pairtype].append(sids[half_point + ip])
res = metrics.confusion_matrix(labels[half_point:], pred)
return res[1][1], res[0][1], res[1][0]