def test_SelectKBest():
'''
test the method of SelectKBert
:return: None
'''
X=[ [1,2,3,4,5],
[5,4,3,2,1],
[3,3,3,3,3,],
[1,1,1,1,1] ]
y=[0,1,0,1]
print("before transform:",X)
selector=SelectKBest(score_func=f_classif,k=3)
selector.fit(X,y)
print("scores_:",selector.scores_)
print("pvalues_:",selector.pvalues_)
print("selected index:",selector.get_support(True))
print("after transform:",selector.transform(X))
python类f_classif()的实例源码
def select_feat(X,y,percentile=20):
"Select best 20 % of features using Anova F-value - *f_classif* from scikit.learn"
selector = SelectPercentile(f_classif, percentile=percentile)
selector.fit(X, y)
return selector.transform(X)
feat_select.py 文件源码
项目:Stock-Market-Analysis-and-Prediction
作者: samshara
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def select_kbest_clf(data_frame, target, k=4):
"""
Selecting K-Best features for classification
:param data_frame: A pandas dataFrame with the training data
:param target: target variable name in DataFrame
:param k: desired number of features from the data
:returns feature_scores: scores for each feature in the data as
pandas DataFrame
"""
feat_selector = SelectKBest(f_classif, k=k)
_ = feat_selector.fit(data_frame.drop(target, axis=1), data_frame[target])
feat_scores = pd.DataFrame()
feat_scores["F Score"] = feat_selector.scores_
feat_scores["P Value"] = feat_selector.pvalues_
feat_scores["Support"] = feat_selector.get_support()
feat_scores["Attribute"] = data_frame.drop(target, axis=1).columns
return feat_scores
def get_classification_data(self, division_dummies=True, samples=None, percentile=100):
raw = PlayerCollection.filter_by_class(self.raw)
df = PlayerCollection.raw_to_df(raw)
players, divisions = PlayerCollection.aggregate_df(df)
players, divisions = PlayerCollection.to_matrix(players, divisions)
players, divisions = PlayerCollection.subsample(players, divisions, samples)
X_train, X_test, y_train, y_test = train_test_split(
players, divisions, random_state=42, stratify=divisions)
selector = SelectPercentile(f_classif, percentile=percentile)
selector.fit(X_train, y_train)
X_train = selector.transform(X_train)
X_test = selector.transform(X_test)
if division_dummies:
y_train = pd.get_dummies(y_train).as_matrix()
y_test = pd.get_dummies(y_test).as_matrix()
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return X_train, X_test, y_train, y_test
def test_clone():
# Tests that clone creates a correct deep copy.
# We create an estimator, make a copy of its original state
# (which, in this case, is the current state of the estimator),
# and check that the obtained copy is a correct deep copy.
from sklearn.feature_selection import SelectFpr, f_classif
selector = SelectFpr(f_classif, alpha=0.1)
new_selector = clone(selector)
assert_true(selector is not new_selector)
assert_equal(selector.get_params(), new_selector.get_params())
selector = SelectFpr(f_classif, alpha=np.zeros((10, 2)))
new_selector = clone(selector)
assert_true(selector is not new_selector)
def test_randomized_logistic():
# Check randomized sparse logistic regression
iris = load_iris()
X = iris.data[:, [0, 2]]
y = iris.target
X = X[y != 2]
y = y[y != 2]
F, _ = f_classif(X, y)
scaling = 0.3
clf = RandomizedLogisticRegression(verbose=False, C=1., random_state=42,
scaling=scaling, n_resampling=50,
tol=1e-3)
X_orig = X.copy()
feature_scores = clf.fit(X, y).scores_
assert_array_equal(X, X_orig) # fit does not modify X
assert_array_equal(np.argsort(F), np.argsort(feature_scores))
clf = RandomizedLogisticRegression(verbose=False, C=[1., 0.5],
random_state=42, scaling=scaling,
n_resampling=50, tol=1e-3)
feature_scores = clf.fit(X, y).scores_
assert_array_equal(np.argsort(F), np.argsort(feature_scores))
def thresholds():
for name in ['ant', 'ivy', 'jedit', 'lucene', 'poi']:
print("##", name)
train, test = explore(dir='../Data/Jureczko/', name=name)
data_DF=csv2DF(train, toBin=True)
metrics=[str[1:] for str in data_DF[data_DF.columns[:-1]]]
ubr = LogisticRegression()
X = data_DF[data_DF.columns[:-1]].values
y = data_DF[data_DF.columns[-1]].values
ubr.fit(X,y)
inter, coef, pVal = ubr.intercept_[0], ubr.coef_[0], f_classif(X,y)[1]
table= texttable.Texttable()
table.set_cols_align(["l","l","l"])
table.set_cols_valign(["m","m","m"])
table.set_cols_dtype(['t', 't', 't'])
table_rows=[["Metric", "Threshold", "P-Value"]]
for i in xrange(len(metrics)):
if VARL(coef[i], inter, p0=0.05)>0 and pVal[i]<0.05:
thresh="%0.2f"%VARL(coef[i], inter, p0=0.1)
table_rows.append([metrics[i], thresh, "%0.3f"%pVal[i]])
table.add_rows(table_rows)
print(table.draw())
# === DEBUG ===
set_trace()
return None
feature_selection.py 文件源码
项目:MultimodalAutoencoder
作者: natashamjaques
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def transform_select_K_best(X_train,Y_train, X_all, K=100):
"""Selects the best K features given the training data.
Args:
X_train: A matrix containing training data
Y_train: Classification labels for the training data
X_all: A matrix containing all the data
K: The number of features to select
"""
skb = SelectKBest(f_classif,K)
skb.fit(X_train,Y_train)
return skb.transform(X_all)
def __init__(self, filename=None):
super().__init__(filename)
if not filename:
self.clf = Pipeline([
('tfidf', TfidfVectorizer(stop_words=sw.words('dutch'))),
('anova', SelectPercentile(f_classif)),
('clf', MultinomialNB())
])
def __init__(self, conf):
SemiSupervisedFeatureSelection.__init__(self, conf)
self.projection = SelectKBest(f_classif, k = conf.num_components)
xgb_classification.py 文件源码
项目:jingjuSingingPhraseMatching
作者: ronggong
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def buildEstimators(mode):
if mode == 'train' or mode == 'cv':
# best parameters got by gridsearchCV, best score: 1
estimators = [('anova_filter', SelectKBest(f_classif, k='all')),
('xgb', xgb.XGBClassifier(learning_rate=0.1,n_estimators=300,max_depth=3))]
clf = Pipeline(estimators)
elif mode == 'test':
clf = pickle.load(open(join(classifier_path,"xgb_classifier.plk"), "r"))
return clf
def de_f_and_p_value(X,y):
dim = X.shape[1]
de = min(2000,dim)
clf = SelectKBest(f_classif,k=de)
clf.fit(X, y)
def _func(X1,X2):
return clf.transform(X1),clf.transform(X2)
return _func
def de_f_and_p_value(X,y):
""" f&p value """
dim = X.shape[1]
de = min(2000,dim)
clf = SelectKBest(f_classif,k=de)
clf.fit(X, y)
def _func(X1,X2):
return clf.transform(X1),clf.transform(X2)
return _func
def new(method='centroid',n_features=8):
# Clustering method
nc = METHODS[method]
# Orthogonal feature selector
if n_features is None: n_features = 'all'
selector = SelectKBest(f_classif, k=n_features)
# NOTE: The only last operation of the list
# must be a classifier or clustering model
print(colored('Cluster model created','yellow'))
return [selector, nc]
def feature_importance_classification(features, target, n_neighbors=3, random_state=None):
cont = features.select_dtypes(include=[np.floating])
disc = features.select_dtypes(include=[np.integer, np.bool])
cont_imp = pd.DataFrame(index=cont.columns)
disc_imp = pd.DataFrame(index=disc.columns)
# Continuous features
if cont_imp.index.size > 0:
# F-test
f_test = feature_selection.f_classif(cont, target)
cont_imp['f_statistic'] = f_test[0]
cont_imp['f_p_value'] = f_test[1]
# Mutual information
mut_inf = feature_selection.mutual_info_classif(cont, target, discrete_features=False,
n_neighbors=n_neighbors,
random_state=random_state)
cont_imp['mutual_information'] = mut_inf
# Discrete features
if disc_imp.index.size > 0:
# Chi²-test
chi2_tests = defaultdict(dict)
for feature in disc.columns:
cont = pd.crosstab(disc[feature], target)
statistic, p_value, _, _ = stats.chi2_contingency(cont)
chi2_tests[feature]['chi2_statistic'] = statistic
chi2_tests[feature]['chi2_p_value'] = p_value
chi2_tests_df = pd.DataFrame.from_dict(chi2_tests, orient='index')
disc_imp['chi2_statistic'] = chi2_tests_df['chi2_statistic']
disc_imp['chi2_p_value'] = chi2_tests_df['chi2_p_value']
# Cramér's V (corrected)
disc_imp['cramers_v'] = [
cramers_v_corrected_stat(pd.crosstab(feature, target).values)
for _, feature in disc.iteritems()
]
# Mutual information
mut_inf = feature_selection.mutual_info_classif(disc, target, discrete_features=True,
n_neighbors=n_neighbors,
random_state=random_state)
disc_imp['mutual_information'] = mut_inf
return cont_imp, disc_imp
feature_select.py 文件源码
项目:movie-quality-profitability-predictor
作者: wbowditch
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def main():
data_table = pd.read_csv("total_set.csv",index_col=0)
film_titles = data_table.index # list of all of our movie titles in the dataset.
#print film_titles
lst = [ 'Sequel',
'Budget',
'YouTube Trailer Views',
'YouTube Like',
'YouTube Dislike',
'YouTube Like:Dislike',
'Reddit UpVotes',
'Distributor',
'Reddit Ratio',
'Reddit Comments',
'Date',
'Runtime',
'MPAA',
'Comedy',
'Action/Adventure',
'Animated',
'Drama'
]
data = data_table[lst]
target = data_table['Profitable']
print data.shape
data_new = SelectKBest(f_classif, k=10).fit_transform(data, target)
print data_new.shape
no_select=compute_cross_fold(data, target)
with_select=compute_cross_fold(data_new, target)
print no_select
print with_select
def test_clone_2():
# Tests that clone doesn't copy everything.
# We first create an estimator, give it an own attribute, and
# make a copy of its original state. Then we check that the copy doesn't
# have the specific attribute we manually added to the initial estimator.
from sklearn.feature_selection import SelectFpr, f_classif
selector = SelectFpr(f_classif, alpha=0.1)
selector.own_attribute = "test"
new_selector = clone(selector)
assert_false(hasattr(new_selector, "own_attribute"))
def test_pipeline_methods_anova():
# Test the various methods of the pipeline (anova).
iris = load_iris()
X = iris.data
y = iris.target
# Test with Anova + LogisticRegression
clf = LogisticRegression()
filter1 = SelectKBest(f_classif, k=2)
pipe = Pipeline([('anova', filter1), ('logistic', clf)])
pipe.fit(X, y)
pipe.predict(X)
pipe.predict_proba(X)
pipe.predict_log_proba(X)
pipe.score(X, y)
def preprocess(X,y):
### test_size is the percentage of events assigned to the test set
### (remainder go into training)
features_train, features_test, labels_train, labels_test = model_selection.train_test_split(X, y, test_size=0.2, random_state=42)
### text vectorization--go from strings to lists of numbers
vectorizer = TfidfVectorizer(sublinear_tf=True, max_df=0.5, stop_words='english')
features_train_transformed = vectorizer.fit_transform(features_train)
features_test_transformed = vectorizer.transform(features_test)
joblib.dump(vectorizer, 'vectorizer_intent.pkl')
### feature selection, because text is super high dimensional and
### can be really computationally chewy as a result
selector = SelectPercentile(f_classif, percentile=10)
selector.fit(features_train_transformed, labels_train)
joblib.dump(selector, 'selector_intent.pkl')
features_train_transformed = selector.transform(features_train_transformed).toarray()
features_test_transformed = selector.transform(features_test_transformed).toarray()
return features_train_transformed, features_test_transformed, labels_train, labels_test
def build(self, dataset, max_feature=10, score_threshold=0.6):
variation = []
for f in self.field_manager.features:
if f.is_categorizable() and not f.category_feature:
variation.append([(f.field_code, False), (f.field_code, True)])
judge_scenarios = itertools.product(*variation)
criteria = f_classif if self.field_manager.target.is_categorizable() else f_regression
self._best_scenario = []
self._best_features = {}
top_score = 0
for s in judge_scenarios:
# prepare the feature
for code, is_category in s:
self.field_manager.get_feature(code).category_feature = is_category
# adjust the dataset
adjusted = self.field_manager.adjust(dataset)
# evaluate the feature
selector = SelectKBest(criteria, k=min(max_feature, len(adjusted.feature_names)))
selector.fit(adjusted.data, adjusted.target)
threshold = max(selector.scores_) * score_threshold
candidates = {}
for i, selected in enumerate(selector.get_support()):
if selected and selector.scores_[i] > threshold:
candidates[adjusted.feature_names[i]] = selector.scores_[i]
if sum(selector.scores_) > top_score:
self._best_scenario = s
self._best_features = candidates
top_score = sum(selector.scores_)
# reflect the setting to field_manager
for code, is_category in self._best_scenario:
self.field_manager.get_feature(code).category_feature = is_category
self.field_manager.selected = list(self._best_features.keys())
def get_params_for_est(estimator, name):
'''Choose initialization parameters for an estimator for auto-testing'''
is_classifier = ClassifierMixin in estimator.__mro__
is_cluster = ClusterMixin in estimator.__mro__
is_ensemble = BaseEnsemble in estimator.__mro__
uses_counts = any(c in name for c in USES_COUNTS)
as_1d = name in REQUIRES_1D
args, params, _ = get_args_kwargs_defaults(estimator.__init__)
est_keys = set(('estimator', 'base_estimator', 'estimators'))
est_keys = (set(params) | set(args)) & est_keys
if is_classifier:
score_func = feat.f_classif
else:
score_func = feat.f_regression
for key in est_keys:
if name == 'SelectFromModel':
params[key] = sklearn.linear_model.LassoCV()
elif is_classifier:
params[key] = sklearn.tree.DecisionTreeClassifier()
else:
params[key] = sklearn.tree.DecisionTreeRegressor()
if key == 'estimators':
params[key] = [(str(_), clone(params[key])) for _ in range(10)]
kw = dict(is_classifier=is_classifier, is_cluster=is_cluster,
is_ensemble=is_ensemble, uses_counts=uses_counts)
if 'score_func' in params:
params['score_func'] = score_func
X, y = make_X_y(**kw)
return X, y, params, kw
def preprocess(words_file = "../tools/word_data.pkl", authors_file="../tools/email_authors.pkl"):
"""
this function takes a pre-made list of email texts (by default word_data.pkl)
and the corresponding authors (by default email_authors.pkl) and performs
a number of preprocessing steps:
-- splits into training/testing sets (10% testing)
-- vectorizes into tfidf matrix
-- selects/keeps most helpful features
after this, the feaures and labels are put into numpy arrays, which play nice with sklearn functions
4 objects are returned:
-- training/testing features
-- training/testing labels
"""
### the words (features) and authors (labels), already largely preprocessed
### this preprocessing will be repeated in the text learning mini-project
authors_file_handler = open(authors_file, "r")
authors = pickle.load(authors_file_handler)
authors_file_handler.close()
words_file_handler = open(words_file, "r")
word_data = cPickle.load(words_file_handler)
words_file_handler.close()
### test_size is the percentage of events assigned to the test set
### (remainder go into training)
features_train, features_test, labels_train, labels_test = cross_validation.train_test_split(word_data, authors, test_size=0.1, random_state=42)
### text vectorization--go from strings to lists of numbers
vectorizer = TfidfVectorizer(sublinear_tf=True, max_df=0.5,
stop_words='english')
features_train_transformed = vectorizer.fit_transform(features_train)
features_test_transformed = vectorizer.transform(features_test)
### feature selection, because text is super high dimensional and
### can be really computationally chewy as a result
selector = SelectPercentile(f_classif, percentile=1)
selector.fit(features_train_transformed, labels_train)
features_train_transformed = selector.transform(features_train_transformed).toarray()
features_test_transformed = selector.transform(features_test_transformed).toarray()
### info on the data
print "no. of Chris training emails:", sum(labels_train)
print "no. of Sara training emails:", len(labels_train)-sum(labels_train)
return features_train_transformed, features_test_transformed, labels_train, labels_test
def test_pipeline_init():
# Test the various init parameters of the pipeline.
assert_raises(TypeError, Pipeline)
# Check that we can't instantiate pipelines with objects without fit
# method
pipe = assert_raises(TypeError, Pipeline, [('svc', IncorrectT)])
# Smoke test with only an estimator
clf = T()
pipe = Pipeline([('svc', clf)])
assert_equal(pipe.get_params(deep=True),
dict(svc__a=None, svc__b=None, svc=clf,
**pipe.get_params(deep=False)))
# Check that params are set
pipe.set_params(svc__a=0.1)
assert_equal(clf.a, 0.1)
assert_equal(clf.b, None)
# Smoke test the repr:
repr(pipe)
# Test with two objects
clf = SVC()
filter1 = SelectKBest(f_classif)
pipe = Pipeline([('anova', filter1), ('svc', clf)])
# Check that we can't use the same stage name twice
assert_raises(ValueError, Pipeline, [('svc', SVC()), ('svc', SVC())])
# Check that params are set
pipe.set_params(svc__C=0.1)
assert_equal(clf.C, 0.1)
# Smoke test the repr:
repr(pipe)
# Check that params are not set when naming them wrong
assert_raises(ValueError, pipe.set_params, anova__C=0.1)
# Test clone
pipe2 = clone(pipe)
assert_false(pipe.named_steps['svc'] is pipe2.named_steps['svc'])
# Check that apart from estimators, the parameters are the same
params = pipe.get_params(deep=True)
params2 = pipe2.get_params(deep=True)
for x in pipe.get_params(deep=False):
params.pop(x)
for x in pipe2.get_params(deep=False):
params2.pop(x)
# Remove estimators that where copied
params.pop('svc')
params.pop('anova')
params2.pop('svc')
params2.pop('anova')
assert_equal(params, params2)
def describe_data(data, info=False, describe=False, value_counts=None, unique=None,
univariate_feature_selection=None, description=None):
# Data diagnostics
if description is not None:
print("\n" + description)
# Info
if info:
print("\nInfo:")
print(data.info())
# Description
if describe:
print("\nDescribe:")
print(data.describe())
# Value counts
if value_counts is not None:
for feature in value_counts:
print("\nValue Counts [" + feature + "]")
print(pd.value_counts(data[feature]))
# Unique values
if unique is not None:
for feature in unique:
print("\nUnique [" + feature + "]")
print(data[feature].unique())
# Univariate feature selection
if univariate_feature_selection is not None:
# Extract predictors and target
predictors = univariate_feature_selection[0]
target = univariate_feature_selection[1]
# Perform feature selection
selector = SelectKBest(f_classif, k="all")
selector.fit(data[predictors], data[target])
# Get the raw p-values for each feature, and transform from p-values into scores
scores = -np.log10(selector.pvalues_)
print("\nUnivariate Feature Selection:")
for feature, imp in sorted(zip(predictors, scores), key=lambda x: x[1] if pd.notnull(x[1]) else 0):
print(feature, imp)