def buildVectorizer(classes, examples, parameters):
featureChoice = None
doFeatureSelection = False
tfidf = False
featureSelectPerc = 10
if "featureChoice" in parameters:
featureChoice = parameters["featureChoice"]
if "doFeatureSelection" in parameters and parameters["doFeatureSelection"] == "True":
doFeatureSelection = True
if "featureSelectPerc" in parameters:
featureSelectPerc = int(parameters["featureSelectPerc"])
if "tfidf" in parameters and parameters["tfidf"] == "True":
tfidf = True
print "Starting vectorizer..."
vectorizer = Vectorizer(classes,examples,featureChoice,tfidf)
vectors = vectorizer.getTrainingVectors()
print "Vectors of size:", vectors.shape
if doFeatureSelection:
print "Trimming training vectors..."
from sklearn.feature_selection import SelectKBest,SelectPercentile,chi2
#featureSelector = SelectKBest(chi2, k=100)`:
featureSelector = SelectPercentile(chi2,featureSelectPerc)
vectorsTrimmed = featureSelector.fit_transform(vectors, classes)
vectorsTrimmed = coo_matrix(vectorsTrimmed)
print "Trimmed training vectors of size:", vectorsTrimmed.shape
else:
vectorsTrimmed = vectors
featureSelector = None
return vectorsTrimmed,vectorizer,featureSelector
python类chi2()的实例源码
semeval_regression_quantification.py 文件源码
项目:semeval2016-task4
作者: aesuli
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def main():
sys.stdout = codecs.getwriter('utf8')(sys.stdout.buffer)
parser = argparse.ArgumentParser(description='')
parser.add_argument('-i', '--input', help='Input file', required=True)
parser.add_argument('-t', '--test', help='Test file', required=True)
parser.add_argument('-o', '--output', help='Output filename prefix', required=True)
parser.add_argument('-c', '--c', help='C value for SVM', type=float, default=1.0)
parser.add_argument('-k', '--k', help='Number of features to keep', type=int, default=1000)
args = parser.parse_args()
data = read_semeval_quantification_regression(args.input, encoding='windows-1252')
texts = list()
labels = list()
topics = list()
for topic in data:
topic_texts, topic_labels = data[topic]
texts.extend(topic_texts)
labels.extend(topic_labels)
topics.extend([topic for _ in topic_labels])
analyzer = get_rich_analyzer(word_ngrams=[2, 3], char_ngrams=[4])
pipeline = Pipeline([
('vect', CountVectorizer(analyzer=analyzer)),
('tfidf', TfidfTransformer()),
('sel', SelectKBest(chi2, k=args.k)),
('clf', BinaryTreeRegressor(base_estimator=LinearSVC(C=args.c), verbose=False)),
])
_, test_topics, test_texts = read_test_data(args.test, encoding='windows-1252')
quantifier = RegressionQuantifier(pipeline)
quantifier.fit(texts, labels, topics)
quantification = quantifier.predict(test_texts, test_topics)
sorted_topics = list(quantification)
sorted_topics.sort()
with open('%sc%f-k%i-plain-E.output' % (args.output, args.c, args.k), 'w', encoding='utf8') as plainfile, \
open('%sc%f-k%i-corrected_train-E.output' % (args.output, args.c, args.k), 'w',
encoding='utf8') as corrected_trainfile, \
open('%sc%f-k%i-corrected_test-E.output' % (args.output, args.c, args.k), 'w',
encoding='utf8') as corrected_testfile:
for topic in sorted_topics:
plain, corrected_train, corrected_test = quantification[topic]
print(topic, *plain, sep='\t', file=plainfile)
print(topic, *corrected_train, sep='\t', file=corrected_trainfile)
print(topic, *corrected_test, sep='\t', file=corrected_testfile)
def main():
sys.stdout = codecs.getwriter('utf8')(sys.stdout.buffer)
parser = argparse.ArgumentParser(description='')
parser.add_argument('-i', '--input', help='Input file', required=True)
parser.add_argument('-b', '--binary',
help='Polarity classification, i.e., posivitive vs negative (default: posivitive/negative/neutral classification)',
action='store_true')
parser.add_argument('-t', '--test', help='Test file', required=True)
parser.add_argument('-o', '--output', help='Output filename prefix', required=True)
parser.add_argument('-c', '--c', help='C value for SVM', type=float, default=1.0)
parser.add_argument('-k', '--k', help='Number of features to keep', type=int, default=1000)
args = parser.parse_args()
data = read_semeval_classification(args.input, encoding='windows-1252')
if args.binary:
data = filter_polarity_classification(data)
analyzer = get_rich_analyzer(word_ngrams=[2, 3], char_ngrams=[4])
pipeline = Pipeline([
('vect', CountVectorizer(analyzer=analyzer)),
('tfidf', TfidfTransformer()),
('sel', SelectKBest(chi2, k=args.k)),
('clf', LinearSVC(C=args.c)),
])
pipeline.fit(data[0], data[1])
test = read_test_data(args.test, args.binary, encoding='windows-1252', topic=args.binary)
classifier = pipeline.fit(data[0], data[1])
y = classifier.predict(test[1])
if args.binary:
task = 'B'
else:
task = 'A'
with open('%sc%f-k%i-%s.output' % (args.output, args.c, args.k, task), 'w', encoding='utf8') as outfile:
if args.binary:
for id_, topic, label in zip(test[0], test[2], y):
print(id_, topic, label, sep='\t', file=outfile)
else:
for id_, label in zip(test[0], y):
print(id_, label, sep='\t', file=outfile)
def predictAndTestEnsemble(X, y, Xtest, ytest, classifiers=[], selectKBest=0):
"""
Trains an Ensemble of classifiers (with default params) and using a training dataset,
and returns majority vote using the same training dataset and an out-of-sample test dataset
:type X: list
:param y: The labels corresponding to the training feature vectors
:type y: list
:param Xtest: The matrix of test feature vectors
:type Xtest: list
:param ytest: The labels corresponding to the test feature vectors
:type ytest: list
:param classifiers: A list of classifiers to use in the ensemble
:type classifiers: list of str
:param selectKBest: The number of best features to select
:type selectKBest: int
:return: Two lists of the validation and test accuracies across the k-folds
"""
try:
predicted, predicted_test = [], []
# Prepare the data
X, y, Xtest, ytest = numpy.array(X), numpy.array(y), numpy.array(Xtest), numpy.array(ytest)
# Define classifiers
ensembleClassifiers = []
for c in classifiers:
if c.lower().find("knn") != -1:
K = int(c.split('-')[-1])
clf = neighbors.KNeighborsClassifier(n_neighbors=K)
elif c.lower().find("svm") != -1:
clf = svm.SVC(kernel='linear', C=1)
elif c.lower().find("forest") != -1:
E = int(c.split('-')[-1])
clf = ensemble.RandomForestClassifier(n_estimators=E,)
# Add to list
ensembleClassifiers.append((c, clf))
# Select K Best features if applicable
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
Xtest_new = SelectKBest(chi2, k=selectKBest).fit_transform(Xtest, ytest) if selectKBest > 0 else Xtest
# Train and fit the voting classifier
voting = VotingClassifier(estimators=ensembleClassifiers, voting='hard')
prettyPrint("Fitting ensemble model")
voting = voting.fit(X_new, y)
prettyPrint("Validating model")
predicted = voting.predict(X_new)
# Same for the test dataset
prettyPrint("Testing the model")
predicted_test = voting.predict(Xtest_new)
except Exception as e:
prettyPrintError(e)
return [], []
return predicted, predicted_test
def predictAndTestRandomForest(X, y, Xtest, ytest, estimators=10, criterion="gini", maxdepth=None, selectKBest=0):
"""
Trains a tree using the training data and tests it using the test data using K-fold cross validation
:param Xtr: The matrix of training feature vectors
:type Xtr: list
:param ytr: The labels corresponding to the training feature vectors
:type ytr: list
:param Xte: The matrix of test feature vectors
:type yte: list
:param estimators: The number of random trees to use in classification
:type estimators: int
:param criterion: The splitting criterion employed by the decision tree
:type criterion: str
:param maxdepth: The maximum depth the tree is allowed to grow
:type maxdepth: int
:param selectKBest: The number of best features to select
:type selectKBest: int
:return: Two lists of the validation and test accuracies across the 10 folds
"""
try:
predicted, predicted_test = [], []
# Define classifier and cross validation iterator
clf = ensemble.RandomForestClassifier(n_estimators=estimators, criterion=criterion, max_depth=maxdepth)
# Start the cross validation learning
X, y, Xtest, ytest = numpy.array(X), numpy.array(y), numpy.array(Xtest), numpy.array(ytest)
# Select K Best features if enabled
prettyPrint("Selecting %s best features from feature vectors" % selectKBest)
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
Xtest_new = SelectKBest(chi2, k=selectKBest).fit_transform(Xtest, ytest) if selectKBest > 0 else Xtest
# Fit model
prettyPrint("Fitting model")
clf.fit(X_new, y)
# Validate and test model
prettyPrint("Validating model using training data")
predicted = clf.predict(X_new)
prettyPrint("Testing model")
predicted_test = clf.predict(Xtest_new)
except Exception as e:
prettyPrintError(e)
return [], []
return predicted, predicted_test
def train(self):
parameters = {'vect__ngram_range': [(1, 1), (1, 2), (1,3), (2,3)],
#'vect__binary': (True, False),
'clf__alpha': (1e-2, 1e-3, 1e-1, 1e-4, 1e-5),
'clf__loss': ('hinge', 'log'),
'clf__penalty': ('l2', 'l1', 'elasticnet')
# 'clf__nu': (0.5,0.6),
#'clf__kernel': ('rbf', 'linear', 'poly'),
# 'clf__tol': (1e-3, 1e-4, 1e-2, 1e-4)
#'clf__n_estimators': (10, 50, 100, 500),
#'clf__criterion': ('gini', 'entropy'),
#'clf__max_features': ("auto", "log2", 100,)
#'clf__alpha': (0, 1e-2, 1e-3, 1e-1, 1e-4, 1e-5),
#'clf__fit_prior': (False, True),
}
# gs_clf = GridSearchCV(self.text_clf, parameters, n_jobs=-1, scoring=self.posfmeasure)
# gs_clf = gs_clf.fit(self.features, self.labels)
# print gs_clf.best_params_
logging.info("Traning with {}/{} true pairs".format(str(sum(self.labels)), str(len(self.labels))))
try:
self.text_clf = self.text_clf.fit(self.features, self.labels)
except ValueError:
print "error training {}".format(self.modelname)
return
if not os.path.exists(self.basedir + self.modelname):
os.makedirs(self.basedir + self.modelname)
logging.info("Training complete, saving to {}/{}/{}.pkl".format(self.basedir, self.modelname, self.modelname))
joblib.dump(self.text_clf, "{}/{}/{}.pkl".format(self.basedir, self.modelname, self.modelname))
ch2 = SelectKBest(chi2, k=20)
half_point = int(len(self.features)*0.5)
X_train = self.text_clf.named_steps["vect"].fit_transform(self.features[:half_point])
X_test = self.text_clf.named_steps["vect"].transform(self.features[half_point:])
X_train = ch2.fit_transform(X_train, self.labels[:half_point])
X_test = ch2.transform(X_test)
feature_names = self.text_clf.named_steps["vect"].get_feature_names()
feature_names = [feature_names[i] for i
in ch2.get_support(indices=True)]
print feature_names
# joblib.dump(gs_clf.best_estimator_, "{}/{}/{}.pkl".format(self.basedir, self.modelname, self.modelname))
# self.test()
def train_sentence_classifier(self, pairtype):
self.text_clf = Pipeline([('vect', CountVectorizer(analyzer='char_wb', ngram_range=(7,20), min_df=0.2, max_df=0.5)),
#('vect', CountVectorizer(analyzer='word', ngram_range=(1,5), stop_words="english", min_df=0.1)),
# ('tfidf', TfidfTransformer(use_idf=True, norm="l2")),
#('tfidf', TfidfVectorizer(analyzer='char_wb', ngram_range=(6,20))),
#('clf', SGDClassifier(loss='hinge', penalty='l1', alpha=0.01, n_iter=5, random_state=42)),
#('clf', SGDClassifier())
#('clf', svm.SVC(kernel='rbf', C=10, verbose=True, tol=1e-5))
#('clf', RandomForestClassifier(n_estimators=10))
#('feature_selection', feature_selection.SelectFromModel(LinearSVC(penalty="l1"))),
('clf', MultinomialNB(alpha=0.1, fit_prior=False))
#('clf', DummyClassifier(strategy="constant", constant=True))
])
f, labels, sids = self.get_features(pairtype)
half_point = int(len(f)*0.5)
self.train_sentences = sids[:half_point]
"""ch2 = SelectKBest(chi2, k=20)
X_train = text_clf.named_steps["vect"].fit_transform(f[:half_point])
X_test = text_clf.named_steps["vect"].transform(f[half_point:])
X_train = ch2.fit_transform(X_train, labels[:half_point])
X_test = ch2.transform(X_test)
feature_names = text_clf.named_steps["vect"].get_feature_names()
feature_names = [feature_names[i] for i
in ch2.get_support(indices=True)]
# print feature_names"""
# train
text_clf = self.text_clf.fit(f[:half_point], labels[:half_point])
#save model
if not os.path.exists("models/kernel_models/" + pairtype + "_sentence_classifier/"):
os.makedirs("models/kernel_models/" + pairtype + "_sentence_classifier/")
logging.info("Training complete, saving to {}/{}/{}.pkl".format("models/kernel_models/",
pairtype + "_sentence_classifier/", pairtype))
joblib.dump(text_clf, "{}/{}/{}.pkl".format("models/kernel_models/",
pairtype + "_sentence_classifier/", pairtype))
# evaluate
pred = text_clf.predict(f[half_point:])
# print len(pred), sum(pred)
self.type_sentences[pairtype] = []
for ip, p in enumerate(pred):
if p:
self.type_sentences[pairtype].append(sids[half_point + ip])
res = metrics.confusion_matrix(labels[half_point:], pred)
return res[1][1], res[0][1], res[1][0]