def test_selective_tsvd():
original = X
cols = [original.columns[0], original.columns[1]] # Only perform on first two columns...
compare_cols = np.array(
original[['petal length (cm)', 'petal width (cm)']].as_matrix()) # should be the same as the trans cols
transformer = SelectiveTruncatedSVD(cols=cols, n_components=1).fit(original)
transformed = transformer.transform(original)
untouched_cols = np.array(transformed[['petal length (cm)', 'petal width (cm)']].as_matrix())
assert_array_almost_equal(compare_cols, untouched_cols)
assert 'Concept1' in transformed.columns
assert transformed.shape[1] == 3
assert isinstance(transformer.get_decomposition(), TruncatedSVD)
assert SelectiveTruncatedSVD().get_decomposition() is None # default None
# test the selective mixin
assert isinstance(transformer.cols, list)
python类TruncatedSVD()的实例源码
def test_basic(algorithm):
a = dd.TruncatedSVD(random_state=0, algorithm=algorithm)
b = sd.TruncatedSVD(random_state=0)
b.fit(Xdense)
a.fit(dXdense)
np.testing.assert_allclose(a.components_, b.components_, atol=1e-3)
assert_estimator_equal(a, b, exclude=['components_',
'explained_variance_'],
atol=1e-3)
assert a.explained_variance_.shape == b.explained_variance_.shape
np.testing.assert_allclose(a.explained_variance_,
b.explained_variance_,
rtol=0.01)
# The rest come straight from scikit-learn, with dask arrays substituted
def __init__(self, language='en', window_width=2, collapse_fes=True, target_size=None):
""" Initializes the extractor.
:param language: The language of the sentences that will be used
:param window_width: how many tokens to look before and after a each
token when building its features.
:param collapse_fes: Whether to collapse FEs to a single token
or to keep them split.
"""
self.language = language
self.tagger = TTPosTagger(language)
self.window_width = window_width
self.collapse_fes = collapse_fes
self.unk_feature = 'UNK'
self.vectorizer = DictVectorizer()
self.target_size = target_size
self.reducer = TruncatedSVD(target_size) if target_size else None
self.vocabulary = set()
self.label_index = {}
self.lu_index = {}
self.stopwords = set(w.lower() for w in StopWords().words(language))
self.start()
def reduce_dimensionality(self, X, n_features):
"""
Apply PCA or SVD to reduce dimension to n_features.
:param X:
:param n_features:
:return:
"""
# Initialize reduction method: PCA or SVD
if self.is_pca == 'PCA':
reducer = PCA(n_components=n_features)
#reducer = PCA(n_components=n_features)
if self.is_pca == 'SVD':
reducer = TruncatedSVD(n_components=n_features)
# Fit and transform data to n_features-dimensional space
reducer.fit(X)
self.X = reducer.transform(X)
logging.debug("Reduced number of features to {0}".format(n_features))
logging.debug("Percentage explained: %s\n" % reducer.explained_variance_ratio_.sum())
return X
def reduce_dimensionality(self, X, n_features):
"""
Apply PCA or SVD to reduce dimension to n_features.
:param X:
:param n_features:
:return:
"""
# Initialize reduction method: PCA or SVD
if self.is_pca == 'PCA':
reducer = PCA(n_components=n_features)
#reducer = PCA(n_components=n_features)
if self.is_pca == 'SVD':
reducer = TruncatedSVD(n_components=n_features)
# Fit and transform data to n_features-dimensional space
reducer.fit(X)
self.X = reducer.transform(X)
logging.debug("Reduced number of features to {0}".format(n_features))
logging.debug("Percentage explained: %s\n" % reducer.explained_variance_ratio_.sum())
return X
def create_pipeline(estimator, reduction=False):
steps = [
('normalize', TextNormalizer()),
('vectorize', TfidfVectorizer(
tokenizer=identity, preprocessor=None, lowercase=False
))
]
if reduction:
steps.append((
'reduction', TruncatedSVD(n_components=10000)
))
# Add the estimator
steps.append(('classifier', estimator))
return Pipeline(steps)
def decompose(doc_vecs, n_features=100, normalize=False, flip=False):
svd = TruncatedSVD(n_features)
if normalize:
if flip:
lsa = make_pipeline(svd, Normalizer(copy=False))
doc_mat = lsa.fit_transform(doc_vecs.transpose())
doc_mat = doc_mat.transpose()
else:
lsa = make_pipeline(svd, Normalizer(copy=False))
doc_mat = lsa.fit_transform(doc_vecs)
return doc_mat
else:
if flip:
doc_mat = svd.fit_transform(doc_vecs.transpose())
doc_mat = doc_mat.transpose()
else:
doc_mat = svd.fit_transform(doc_vecs)
return doc_mat
def init_model():
# “????”??
f_trunk = QuestionTrunkVectorizer(tokenizer=tokenize)
# Word2Vec ????
f_word2vec = Question2VecVectorizer(tokenizer=tokenize)
# ???? (400 ?)
union_features = FeatureUnion([
('f_trunk_lsa', Pipeline([
('trunk', f_trunk),
# ??_????: ?????? (LSA)
('lsa', TruncatedSVD(n_components=200, n_iter=10))
])),
('f_word2vec', f_word2vec),
])
model = Pipeline([('union', union_features), ('clf', LinearSVC(C=0.02))])
return model
def reduce_dimensionality(X, n_features):
"""
Apply PCA or SVD to reduce dimension to n_features.
:param X:
:param n_features:
:return:
"""
# Initialize reduction method: PCA or SVD
# reducer = PCA(n_components=n_features)
reducer = TruncatedSVD(n_components=n_features)
# Fit and transform data to n_features-dimensional space
reducer.fit(X)
X = reducer.transform(X)
logging.debug("Reduced number of features to {0}".format(n_features))
logging.debug("Percentage explained: %s\n" % reducer.explained_variance_ratio_.sum())
return X
def transform(self):
# ngrams
obs_ngrams = list(map(lambda x: ngram_utils._ngrams(x.split(" "), self.obs_ngram, "_"), self.obs_corpus))
target_ngrams = list(map(lambda x: ngram_utils._ngrams(x.split(" "), self.target_ngram, "_"), self.target_corpus))
# cooccurrence ngrams
cooc_terms = list(map(lambda lst1,lst2: self._get_cooc_terms(lst1, lst2, "X"), obs_ngrams, target_ngrams))
## tfidf
tfidf = self._init_word_ngram_tfidf(ngram=1)
X = tfidf.fit_transform(cooc_terms)
## svd
svd = TruncatedSVD(n_components=self.svd_dim,
n_iter=self.svd_n_iter, random_state=config.RANDOM_SEED)
return svd.fit_transform(X)
# 2nd in CrowdFlower (preprocessing_mikhail.py)
def transform(self):
## get common vocabulary
tfidf = self._init_word_ngram_tfidf(self.ngram)
tfidf.fit(list(self.obs_corpus) + list(self.target_corpus))
vocabulary = tfidf.vocabulary_
## obs tfidf
tfidf = self._init_word_ngram_tfidf(self.ngram, vocabulary)
X_obs = tfidf.fit_transform(self.obs_corpus)
## targetument tfidf
tfidf = self._init_word_ngram_tfidf(self.ngram, vocabulary)
X_target = tfidf.fit_transform(self.target_corpus)
## svd
svd = TruncatedSVD(n_components = self.svd_dim,
n_iter=self.svd_n_iter, random_state=config.RANDOM_SEED)
svd.fit(scipy.sparse.vstack((X_obs, X_target)))
X_obs = svd.transform(X_obs)
X_target = svd.transform(X_target)
## cosine similarity
sim = list(map(dist_utils._cosine_sim, X_obs, X_target))
sim = np.asarray(sim).squeeze()
return sim
def tfidf(corpus, corpusKeys):
#TODO clean this up
#discard any stop words - saves on processing
stopset = list(stopwords.words('english'))
stopset.append('000')
stopset.extend([str(x) for x in range(9999)])
vectorizer = TfidfVectorizer(stop_words=stopset, use_idf=True, ngram_range=(2,3))
#matrix of input set
X = (vectorizer.fit_transform(corpus)).toarray()
size_matrix = X.shape[0]
lsa = TruncatedSVD(n_components=size_matrix, n_iter=100)
terms = vectorizer.get_feature_names()
records = []
for i, comp in enumerate(X):
termsInComp = zip(terms, comp)
sortedTerms = sorted(termsInComp, key=lambda x: x[1], reverse=True) [:10]
#List with all the terms gathered from the tfidf vectorizer
termList = [term[0] + '.' for term in sortedTerms]
# List with Article ID and list of tfidf terms
records.append((vader(corpusKeys[i], termList), termList))
return records
def test_random_hasher():
# test random forest hashing on circles dataset
# make sure that it is linearly separable.
# even after projected to two SVD dimensions
# Note: Not all random_states produce perfect results.
hasher = RandomTreesEmbedding(n_estimators=30, random_state=1)
X, y = datasets.make_circles(factor=0.5)
X_transformed = hasher.fit_transform(X)
# test fit and transform:
hasher = RandomTreesEmbedding(n_estimators=30, random_state=1)
assert_array_equal(hasher.fit(X).transform(X).toarray(),
X_transformed.toarray())
# one leaf active per data point per forest
assert_equal(X_transformed.shape[0], X.shape[0])
assert_array_equal(X_transformed.sum(axis=1), hasher.n_estimators)
svd = TruncatedSVD(n_components=2)
X_reduced = svd.fit_transform(X_transformed)
linear_clf = LinearSVC()
linear_clf.fit(X_reduced, y)
assert_equal(linear_clf.score(X_reduced, y), 1.)
def __init__(self, **svd_kwargs):
super(SVDTransform, self).__init__(TruncatedSVD, **svd_kwargs)
def main():
features = []
for i in list:
im = cv2.imread(i)
hist, bins = np.histogram(im.ravel(), 256, [0, 256])
features.append(hist)
lsa = TruncatedSVD(10)
features = lsa.fit_transform(features)
features = Normalizer(copy = False).fit_transform(features)
km = KMeans(
init='k-means++',
n_clusters=n_clusters,
)
km.fit(features)
for i in range(n_clusters):
if not os.path.exists('./result/' + str(i)):
os.makedirs('./result/' + str(i))
cnt = 0
for i in list:
filename = i.split('/')[-1]
print filename,
print km.labels_[cnt]
shutil.copyfile(i, './result/' + str(km.labels_[cnt]) + '/' + filename)
cnt += 1
def fit(self, X, y=None):
"""Fit the transformer.
Parameters
----------
X : Pandas ``DataFrame``, shape=(n_samples, n_features)
The Pandas frame to fit. The frame will only
be fit on the prescribed ``cols`` (see ``__init__``) or
all of them if ``cols`` is None. Furthermore, ``X`` will
not be altered in the process of the fit.
y : None
Passthrough for ``sklearn.pipeline.Pipeline``. Even
if explicitly set, will not change behavior of ``fit``.
Returns
-------
self
"""
# check on state of X and cols
X, self.cols = validate_is_pd(X, self.cols)
cols = _cols_if_none(X, self.cols)
# fails thru if names don't exist:
self.svd_ = TruncatedSVD(
n_components=self.n_components,
algorithm=self.algorithm,
n_iter=self.n_iter).fit(X[cols].as_matrix())
return self
def get_decomposition(self):
"""Overridden from the :class:``skutil.decomposition.decompose._BaseSelectiveDecomposer`` class,
this method returns the internal decomposition class:
``sklearn.decomposition.TruncatedSVD``
Returns
-------
self.svd_ : ``sklearn.decomposition.TruncatedSVD``
The fit internal decomposition class
"""
return self.svd_ if hasattr(self, 'svd_') else None
def get_pc(data, We, weight4ind, params):
"Comput the principal component"
def get_weighted_average(We, x, w):
"Compute the weighted average vectors"
n_samples = x.shape[0]
emb = np.zeros((n_samples, We.shape[1]))
for i in xrange(n_samples):
emb[i,:] = w[i,:].dot(We[x[i,:],:]) / np.count_nonzero(w[i,:])
return emb
for i in data:
i[0].populate_embeddings(words)
if not params.task == "sentiment":
i[1].populate_embeddings(words)
if params.task == "ent":
(scores,g1x,g1mask,g2x,g2mask) = data_io.getDataEntailment(data)
if params.weightfile:
g1mask = data_io.seq2weight(g1x, g1mask, weight4ind)
elif params.task == "sim":
(scores,g1x,g1mask,g2x,g2mask) = data_io.getDataSim(data, -1)
if params.weightfile:
g1mask = data_io.seq2weight(g1x, g1mask, weight4ind)
elif params.task == "sentiment":
(scores,g1x,g1mask) = data_io.getDataSentiment(data)
if params.weightfile:
g1mask = data_io.seq2weight(g1x, g1mask, weight4ind)
emb = get_weighted_average(We, g1x, g1mask)
svd = TruncatedSVD(n_components=params.npc, n_iter=7, random_state=0)
svd.fit(emb)
return svd.components_
def compute_pc(X,npc=1):
"""
Compute the principal components. DO NOT MAKE THE DATA ZERO MEAN!
:param X: X[i,:] is a data point
:param npc: number of principal components to remove
:return: component_[i,:] is the i-th pc
"""
svd = TruncatedSVD(n_components=npc, n_iter=7, random_state=0)
svd.fit(X)
return svd.components_
def test_algorithms():
svd_a = sd.TruncatedSVD(30, algorithm="arpack")
svd_r = dd.TruncatedSVD(30, algorithm="tsqr", random_state=42)
Xa = svd_a.fit_transform(Xdense)[:, :6]
Xr = svd_r.fit_transform(dXdense)[:, :6]
assert_array_almost_equal(Xa, Xr, decimal=5)
comp_a = np.abs(svd_a.components_)
comp_r = np.abs(svd_r.components_)
# All elements are equal, but some elements are more equal than others.
assert_array_almost_equal(comp_a[:9], comp_r[:9])
assert_array_almost_equal(comp_a[9:], comp_r[9:], decimal=2)
def test_attributes():
for n_components in (10, 25, 41):
tsvd = dd.TruncatedSVD(n_components).fit(dXdense)
assert tsvd.n_components == n_components
assert tsvd.components_.shape == (n_components, n_features)
def test_too_many_components():
for n_components in (n_features, n_features + 1):
tsvd = dd.TruncatedSVD(n_components=n_components)
with pytest.raises(ValueError):
tsvd.fit(dXdense)
def test_inverse_transform():
# We need a lot of components for the reconstruction to be "almost
# equal" in all positions. XXX Test means or sums instead?
a = dd.TruncatedSVD(n_components=52, random_state=42, n_iter=5)
b = sd.TruncatedSVD(n_components=52, random_state=42)
b.fit(Xdense)
Xt = a.fit_transform(dXdense)
Xinv = a.inverse_transform(Xt)
assert_array_almost_equal(Xinv.compute(), Xdense, decimal=1)
def truncated_svd(self):
# https://github.com/chrisjmccormick/LSA_Classification/blob/master/inspect_LSA.py
svd = TruncatedSVD(self.dimensions)
lsa = make_pipeline(svd, Normalizer(copy=False))
X_reduced = lsa.fit_transform(self.bag_of_words_matrix)
print(svd.components_[0])
print(svd.explained_variance_ratio_)
print(svd.explained_variance_ratio_.sum())
def compress_the_dimension():
X = io.loadmat("X_matrix")['PPMI']
a = PCA(300)
a.fit(X)
#decomp = TruncatedSVD(n_components=300, n_iter=7)
#decomp.fit(X)
truncated_X = decomp.transform(X)
return truncated_X
def score_models(models, loader):
for model in models:
name = model.named_steps['classifier'].__class__.__name__
if 'reduction' in model.named_steps:
name += " (TruncatedSVD)"
scores = {
'model': str(model),
'name': name,
'accuracy': [],
'precision': [],
'recall': [],
'f1': [],
'time': [],
}
for X_train, X_test, y_train, y_test in loader:
start = time.time()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
scores['time'].append(time.time() - start)
scores['accuracy'].append(accuracy_score(y_test, y_pred))
scores['precision'].append(precision_score(y_test, y_pred, average='weighted'))
scores['recall'].append(recall_score(y_test, y_pred, average='weighted'))
scores['f1'].append(f1_score(y_test, y_pred, average='weighted'))
yield scores
def _perform_svd(self):
if self._svd and self.data_vectors.shape[1] > 50:
print('dimension reduction using svd')
print ('dimension before: {}'.format(str(self.data_vectors.shape[1])))
self.data_vectors = TruncatedSVD(n_components=50, random_state=0).fit_transform(self.data_vectors)
print ('dimension after: {}'.format(str(self.data_vectors.shape[1])))
def main():
svd = TruncatedSVD()
Z = svd.fit_transform(X)
plt.scatter(Z[:,0], Z[:,1])
for i in xrange(D):
plt.annotate(s=index_word_map[i], xy=(Z[i,0], Z[i,1]))
plt.show()
def make_ward_clustering(self, short_filenames, input_texts):
output_dir = self.output_dir + 'WARD/'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if self.need_tf_idf:
self.signals.PrintInfo.emit("?????? TF-IDF...")
idf_filename = output_dir + 'tf_idf.csv'
msg = self.calculate_and_write_tf_idf(idf_filename, input_texts)
self.signals.PrintInfo.emit(msg)
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(input_texts)
svd = TruncatedSVD(2)
normalizer = Normalizer(copy=False)
lsa = make_pipeline(svd, normalizer)
X = lsa.fit_transform(X)
ward = AgglomerativeClustering(n_clusters=self.ward_clusters_count, linkage='ward')
predict_result = ward.fit_predict(X)
self.signals.PrintInfo.emit('\n??????? ?? ??????????:\n')
clasters_output = ''
for claster_index in range(max(predict_result) + 1):
clasters_output += ('??????? ' + str(claster_index) + ':\n')
for predict, document in zip(predict_result, short_filenames):
if predict == claster_index:
clasters_output += (' ' + str(document) + '\n')
clasters_output += '\n'
self.signals.PrintInfo.emit(clasters_output)
self.signals.PrintInfo.emit('????????? ?:' + str(output_dir + 'clusters.txt'))
writeStringToFile(clasters_output, output_dir + 'clusters.txt')
self.draw_clusters_plot(X, predict_result, short_filenames)
def make_spectral_clustering(self, short_filenames, input_texts):
output_dir = self.output_dir + 'spectral/'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if self.need_tf_idf:
self.signals.PrintInfo.emit("?????? TF-IDF...")
idf_filename = output_dir + 'tf_idf.csv'
msg = self.calculate_and_write_tf_idf(idf_filename, input_texts)
self.signals.PrintInfo.emit(msg)
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(input_texts)
svd = TruncatedSVD(2)
normalizer = Normalizer(copy=False)
lsa = make_pipeline(svd, normalizer)
X = lsa.fit_transform(X)
spectral = SpectralClustering(n_clusters=self.spectral_clusters_count)
predict_result = spectral.fit_predict(X)
self.signals.PrintInfo.emit('\n??????? ?? ??????????:\n')
clasters_output = ''
for claster_index in range(max(predict_result) + 1):
clasters_output += ('??????? ' + str(claster_index) + ':\n')
for predict, document in zip(predict_result, short_filenames):
if predict == claster_index:
clasters_output += (' ' + str(document) + '\n')
clasters_output += '\n'
self.signals.PrintInfo.emit(clasters_output)
self.signals.PrintInfo.emit('????????? ?:' + str(output_dir + 'clusters.txt'))
writeStringToFile(clasters_output, output_dir + 'clusters.txt')
self.draw_clusters_plot(X, predict_result, short_filenames)
# aa = Affinity Propagation