def test_feature_union_fit_failure():
X, y = make_classification(n_samples=100, n_features=10, random_state=0)
pipe = Pipeline([('union', FeatureUnion([('good', MockClassifier()),
('bad', FailingClassifier())],
transformer_weights={'bad': 0.5})),
('clf', MockClassifier())])
grid = {'union__bad__parameter': [0, 1, 2]}
gs = dcv.GridSearchCV(pipe, grid, refit=False, scoring=None)
# Check that failure raises if error_score is `'raise'`
with pytest.raises(ValueError):
gs.fit(X, y)
# Check that grid scores were set to error_score on failure
gs.error_score = float('nan')
with pytest.warns(FitFailedWarning):
gs.fit(X, y)
check_scores_all_nan(gs, 'union__bad__parameter')
python类FeatureUnion()的实例源码
def train(self, train_size=0.8, k_folds=5):
# retrieve data from DB and pre-process
self._get_data()
# perform train/test split
self._get_train_test_split(train_size=train_size)
# define text pre-processing pipeline
text_pipeline = Pipeline([
('extract_text', DFColumnExtractor(TEXT_FEATURES)),
('vect', TfidfVectorizer(tokenizer=twitter_tokenizer))
])
# define pipeline for pre-processing of numeric features
numeric_pipeline = Pipeline([
('extract_nums', DFColumnExtractor(NON_TEXT_FEATURES)),
('scaler', MinMaxScaler())
])
# combine both steps into a single pipeline
pipeline = Pipeline([
('features', FeatureUnion([
('text_processing', text_pipeline),
('num_processing', numeric_pipeline)
])),
('clf', self._estimator)
])
self.logger.info('Fitting model hyperparameters with {0}-fold CV'.format(k_folds))
gs = GridSearchCV(pipeline, self.params, n_jobs=-1, cv=k_folds)
X = self.data.iloc[self.train_inds_, :]
y = self.data[LABEL].values[self.train_inds_]
gs.fit(X, y)
self.logger.info('Validation set accuracy is {0}'.format(gs.best_score_))
self.gs_ = gs
self.model_ = gs.best_estimator_
def test_feature_union_fit_failure_multiple_metrics():
scoring = {"score_1": _passthrough_scorer, "score_2": _passthrough_scorer}
X, y = make_classification(n_samples=100, n_features=10, random_state=0)
pipe = Pipeline([('union', FeatureUnion([('good', MockClassifier()),
('bad', FailingClassifier())],
transformer_weights={'bad': 0.5})),
('clf', MockClassifier())])
grid = {'union__bad__parameter': [0, 1, 2]}
gs = dcv.GridSearchCV(pipe, grid, refit=False, scoring=scoring)
# Check that failure raises if error_score is `'raise'`
with pytest.raises(ValueError):
gs.fit(X, y)
# Check that grid scores were set to error_score on failure
gs.error_score = float('nan')
with pytest.warns(FitFailedWarning):
gs.fit(X, y)
for key in scoring:
check_scores_all_nan(gs, 'union__bad__parameter', score_key=key)
def test_feature_union_raises():
X, y = make_classification(n_samples=100, n_features=10, random_state=0)
union = FeatureUnion([('tr0', MockClassifier()),
('tr1', MockClassifier())])
pipe = Pipeline([('union', union), ('est', MockClassifier())])
grid = {'union__tr2__parameter': [0, 1, 2]}
gs = dcv.GridSearchCV(pipe, grid, refit=False)
with pytest.raises(ValueError):
gs.fit(X, y)
grid = {'union__transformer_list': [[('one', MockClassifier())]]}
gs = dcv.GridSearchCV(pipe, grid, refit=False)
with pytest.raises(NotImplementedError):
gs.fit(X, y)
def __init__(self, lang=None, method=None, features=None):
fs = []
if 'unigram' in features:
fs.append(word_unigrams())
if 'bigram' in features:
fs.append(word_bigrams())
if 'spelling' in features:
fs.append(avg_spelling_error(lang=lang))
if 'punctuation' in features:
fs.append(punctuation_features())
if 'char' in features:
fs.append(char_ngrams())
fu = FeatureUnion(fs, n_jobs=1)
self.pipeline = Pipeline([('features', fu),
('scale', Normalizer()),
('classifier', get_classifier(method=method))])
04_sent.py 文件源码
项目:Building-Machine-Learning-Systems-With-Python-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def create_union_model(params=None):
def preprocessor(tweet):
tweet = tweet.lower()
for k in emo_repl_order:
tweet = tweet.replace(k, emo_repl[k])
for r, repl in re_repl.iteritems():
tweet = re.sub(r, repl, tweet)
return tweet.replace("-", " ").replace("_", " ")
tfidf_ngrams = TfidfVectorizer(preprocessor=preprocessor,
analyzer="word")
ling_stats = LinguisticVectorizer()
all_features = FeatureUnion(
[('ling', ling_stats), ('tfidf', tfidf_ngrams)])
#all_features = FeatureUnion([('tfidf', tfidf_ngrams)])
#all_features = FeatureUnion([('ling', ling_stats)])
clf = MultinomialNB()
pipeline = Pipeline([('all', all_features), ('clf', clf)])
if params:
pipeline.set_params(**params)
return pipeline
def init_model():
# “????”??
f_trunk = QuestionTrunkVectorizer(tokenizer=tokenize)
# Word2Vec ????
f_word2vec = Question2VecVectorizer(tokenizer=tokenize)
# ???? (400 ?)
union_features = FeatureUnion([
('f_trunk_lsa', Pipeline([
('trunk', f_trunk),
# ??_????: ?????? (LSA)
('lsa', TruncatedSVD(n_components=200, n_iter=10))
])),
('f_word2vec', f_word2vec),
])
model = Pipeline([('union', union_features), ('clf', LinearSVC(C=0.02))])
return model
def __add__(self, other):
"""
Returns:
:py:class:`ibex.sklearn.pipeline.FeatureUnion`
"""
if isinstance(self, FeatureUnion):
self_features = [e[1] for e in self.transformer_list]
else:
self_features = [self]
if isinstance(other, FeatureUnion):
other_features = [e[1] for e in other.transformer_list]
else:
other_features = [other]
combined = self_features + other_features
return FeatureUnion(_make_pipeline_steps(combined))
def make_union(*transformers, **kwargs):
"""Construct a FeatureUnion with alternative estimators to search over
Parameters
----------
steps
Each step is specified as one of:
* an estimator instance
* None (meaning no features)
* a list of the above, indicating that a grid search should alternate
over the estimators (or None) in the list
kwargs
Keyword arguments to the constructor of
:class:`sklearn.pipeline.FeatureUnion`.
Notes
-----
Each step is named according to the set of estimator types in its list:
* if a step has only one type of estimator (disregarding None), it takes
that estimator's class name (lowercased)
* if a step has estimators of mixed type, the step is named 'alt'
* if there are multiple steps of the same name using the above rules,
a suffix '-1', '-2', etc. is added.
"""
steps, grid = _name_steps(transformers)
return set_grid(_FeatureUnion(steps, **kwargs), **grid)
def get_pipeline(sample_col, parallel_jobs=None):
feat_ext_objs = [feat_ext_class(sample_col)
for feat_ext_class in get_objs(FEAT_EXTS_DIR, 'Worker')]
feat_ext_tuples = [(feat_ext_obj.feature_name, feat_ext_obj)
for feat_ext_obj in feat_ext_objs]
pipeline = Pipeline([
('features', FeatureUnion(feat_ext_tuples, n_jobs=parallel_jobs)),
('describe_data', describe_data.Transformer()),
('classifier', MultinomialNB()),
])
return pipeline
def test_feature_union(weights):
X = np.ones((10, 5))
y = np.zeros(10)
union = FeatureUnion([('tr0', ScalingTransformer()),
('tr1', ScalingTransformer()),
('tr2', ScalingTransformer())])
factors = [(2, 3, 5), (2, 4, 5), (2, 4, 6),
(2, 4, None), (None, None, None)]
params, sols, grid = [], [], []
for constants, w in product(factors, weights or [None]):
p = {}
for n, c in enumerate(constants):
if c is None:
p['tr%d' % n] = None
elif n == 3: # 3rd is always an estimator
p['tr%d' % n] = ScalingTransformer(c)
else:
p['tr%d__factor' % n] = c
sol = union.set_params(transformer_weights=w, **p).transform(X)
sols.append(sol)
if w is not None:
p['transformer_weights'] = w
params.append(p)
p2 = {'union__' + k: [v] for k, v in p.items()}
p2['est'] = [CheckXClassifier(sol[0])]
grid.append(p2)
# Need to recreate the union after setting estimators to `None` above
union = FeatureUnion([('tr0', ScalingTransformer()),
('tr1', ScalingTransformer()),
('tr2', ScalingTransformer())])
pipe = Pipeline([('union', union), ('est', CheckXClassifier())])
gs = dcv.GridSearchCV(pipe, grid, refit=False, cv=2)
with warnings.catch_warnings(record=True):
gs.fit(X, y)
def feature_union_concat(Xs, nsamples, weights):
"""Apply weights and concatenate outputs from a FeatureUnion"""
if any(x is FIT_FAILURE for x in Xs):
return FIT_FAILURE
Xs = [X if w is None else X * w for X, w in zip(Xs, weights)
if X is not None]
if not Xs:
return np.zeros((nsamples, 0))
if any(sparse.issparse(f) for f in Xs):
return sparse.hstack(Xs).tocsr()
return np.hstack(Xs)
# Current set_params isn't threadsafe
def construct_pipeline(classifier):
"""
This function creates a feature extraction pipeline that accepts data
from a CorpusLoader and appends the classification model to the end of
the pipeline, returning a newly constructed Pipeline object that is
ready to be fit and trained!
"""
return Pipeline([
# Create a Feature Union of Text Stats and Bag of Words
('union', FeatureUnion(
transformer_list = [
# Pipeline for pulling document structure features
('stats', Pipeline([
('stats', TextStats()),
('vect', DictVectorizer()),
])),
# Pipeline for creating a bag of words TF-IDF vector
('bow', Pipeline([
('tokens', TextNormalizer()),
('tfidf', TfidfVectorizer(
tokenizer=identity, preprocessor=None, lowercase=False
)),
('best', TruncatedSVD(n_components=1000)),
])),
],
# weight components in feature union
transformer_weights = {
'stats': 0.15,
'bow': 0.85,
},
)),
# Append the estimator to the end of the pipeline
('classifier', classifier),
])
def get_feature_transformer(parser, run_grammar=True, run_tfidf=True):
'''
Creates a transformer object that will take a text series and generate TFIDF counts and frequency of syntactical structures.
Suitable for use as a step in a SKLearn Pipeline.
inputs:
parser: a Spacy pipeline object
returns:
feature transformer: FeatureUnion
'''
tfidf = Pipeline([
('cln', CleanTextTransformer()),
('pre', PreTokenizer(parser=parser)),
('vect', TfidfVectorizer(
max_features=3000, decode_error='replace')),
('clf', None)
])
grammar_counter = Pipeline([
('cln', CleanTextTransformer()),
('grm', GrammarTransformer(parser=parser)),
('to_dict', DictVectorizer()),
('clf', None)
])
if run_grammar and run_tfidf:
print('Running both feature sets.')
feature_transformer = FeatureUnion([("tfidf", tfidf), ('grammar_counter', grammar_counter)])
elif not run_grammar:
print('Running only TFIDF.')
feature_transformer = FeatureUnion([("tfidf", tfidf)])
elif not run_tfidf:
print('Running only PCFGs.')
feature_transformer = FeatureUnion([('grammar_counter', grammar_counter)])
return feature_transformer
def __init__(self, transformer_list, n_jobs=1, transformer_weights=None, as_index=True):
pipeline.FeatureUnion.__init__(
self,
transformer_list,
n_jobs,
transformer_weights)
FrameMixin.__init__(self)
self._as_index = as_index
# Tmp Ami - get docstrings from sklearn.
def test_feature_union():
# basic sanity check for feature union
iris = load_iris()
X = iris.data
X -= X.mean(axis=0)
y = iris.target
svd = TruncatedSVD(n_components=2, random_state=0)
select = SelectKBest(k=1)
fs = FeatureUnion([("svd", svd), ("select", select)])
fs.fit(X, y)
X_transformed = fs.transform(X)
assert_equal(X_transformed.shape, (X.shape[0], 3))
# check if it does the expected thing
assert_array_almost_equal(X_transformed[:, :-1], svd.fit_transform(X))
assert_array_equal(X_transformed[:, -1],
select.fit_transform(X, y).ravel())
# test if it also works for sparse input
# We use a different svd object to control the random_state stream
fs = FeatureUnion([("svd", svd), ("select", select)])
X_sp = sparse.csr_matrix(X)
X_sp_transformed = fs.fit_transform(X_sp, y)
assert_array_almost_equal(X_transformed, X_sp_transformed.toarray())
# test setting parameters
fs.set_params(select__k=2)
assert_equal(fs.fit_transform(X, y).shape, (X.shape[0], 4))
# test it works with transformers missing fit_transform
fs = FeatureUnion([("mock", TransfT()), ("svd", svd), ("select", select)])
X_transformed = fs.fit_transform(X, y)
assert_equal(X_transformed.shape, (X.shape[0], 8))
def test_feature_union_weights():
# test feature union with transformer weights
iris = load_iris()
X = iris.data
y = iris.target
pca = PCA(n_components=2, svd_solver='randomized', random_state=0)
select = SelectKBest(k=1)
# test using fit followed by transform
fs = FeatureUnion([("pca", pca), ("select", select)],
transformer_weights={"pca": 10})
fs.fit(X, y)
X_transformed = fs.transform(X)
# test using fit_transform
fs = FeatureUnion([("pca", pca), ("select", select)],
transformer_weights={"pca": 10})
X_fit_transformed = fs.fit_transform(X, y)
# test it works with transformers missing fit_transform
fs = FeatureUnion([("mock", TransfT()), ("pca", pca), ("select", select)],
transformer_weights={"mock": 10})
X_fit_transformed_wo_method = fs.fit_transform(X, y)
# check against expected result
# We use a different pca object to control the random_state stream
assert_array_almost_equal(X_transformed[:, :-1], 10 * pca.fit_transform(X))
assert_array_equal(X_transformed[:, -1],
select.fit_transform(X, y).ravel())
assert_array_almost_equal(X_fit_transformed[:, :-1],
10 * pca.fit_transform(X))
assert_array_equal(X_fit_transformed[:, -1],
select.fit_transform(X, y).ravel())
assert_equal(X_fit_transformed_wo_method.shape, (X.shape[0], 7))
def test_feature_union_feature_names():
word_vect = CountVectorizer(analyzer="word")
char_vect = CountVectorizer(analyzer="char_wb", ngram_range=(3, 3))
ft = FeatureUnion([("chars", char_vect), ("words", word_vect)])
ft.fit(JUNK_FOOD_DOCS)
feature_names = ft.get_feature_names()
for feat in feature_names:
assert_true("chars__" in feat or "words__" in feat)
assert_equal(len(feature_names), 35)
def prop_vectorizer(train_docs, which, stats=None, n_most_common_tok=1000,
n_most_common_dep=1000, return_transf=False):
# One pass to compute training corpus statistics.
train_docs = list(train_docs)
if stats is None:
stats = stats_train(train_docs)
lemma_freqs, _, dep_freqs, _, _ = stats
# vectorize BOW-style features
lemma_vocab = [w for w, _ in lemma_freqs[:n_most_common_tok]]
dep_vocab = [p for p, _ in dep_freqs[:n_most_common_dep]]
vects = dict(lemmas=dict(vocabulary=lemma_vocab, lowercase=True),
dependency_tuples=dict(vocabulary=dep_vocab), pos={},
discourse={}, indicators={}, indicator_preceding_in_para={},
indicator_following_in_para={})
raw_keys = ['is_first_in_para', 'is_last_in_para', 'toks_to_sent_ratio',
'relative_in_para', 'first_person_any', 'root_vb_modal',
'root_vb_tense']
nrm_keys = ['n_tokens', 'n_toks_in_sent', 'n_toks_in_para',
'n_toks_preceding_in_sent', 'n_toks_following_in_sent',
'preceding_props_in_para', 'following_props_in_para',
'parse_tree_height', 'n_subordinate_clauses']
if which == 'ukp':
raw_keys += ['is_in_intro', 'is_in_conclusion',
'has_shared_np_intro', 'has_shared_vp_intro',
'has_shared_np_conclusion', 'has_shared_vp_conclusion']
nrm_keys += ['n_shared_np_intro', 'n_shared_vp_intro',
'n_shared_np_conclusion', 'n_shared_vp_conclusion']
# load embeds
embed_vocab, embeds = load_embeds(which)
vect_list = list(make_union_prop(vects)) + [
('raw', FilteredDictVectorizer(raw_keys)),
('nrm', make_pipeline(FilteredDictVectorizer(nrm_keys, sparse=False),
MinMaxScaler((0, 1)))),
('embeds', EmbeddingVectorizer(embeds, embed_vocab))]
if which == 'ukp':
vect_list.append(('proba', PrecedingStats()))
vect = FeatureUnion(vect_list)
train_feats = [f for doc in train_docs for f in doc.prop_features]
if return_transf:
X_tr = vect.fit_transform(train_feats)
return vect, X_tr
else:
return vect.fit(train_feats)
def link_vectorizer(train_docs, stats=None, n_most_common=1000,
return_transf=False):
# One pass to compute training corpus statistics.
train_docs = list(train_docs)
if stats is None:
stats = stats_train(train_docs)
lemma_freqs, prod_freqs, _, pmi_incoming, pmi_outgoing = stats
# vectorize BOW-style features
lemma_vocab = [w for w, _ in lemma_freqs[:n_most_common]]
prod_vocab = [p for p, _ in prod_freqs[:n_most_common]]
vects = dict(lemmas=dict(vocabulary=lemma_vocab, lowercase=True),
productions=dict(vocabulary=prod_vocab), pos={}, discourse={},
indicators={}, indicator_preceding_in_para={},
indicator_following_in_para={})
raw_keys = ['src__is_first_in_para', 'src__is_last_in_para',
'trg__is_first_in_para', 'trg__is_last_in_para',
'same_sentence', 'src_precedes_trg', 'trg_precedes_src',
'any_shared_nouns', 'src__pmi_pos_ratio', 'src__pmi_neg_ratio',
'trg__pmi_pos_ratio', 'trg__pmi_neg_ratio', 'src__pmi_pos_any',
'src__pmi_neg_any', 'trg__pmi_pos_any', 'trg__pmi_neg_any', ]
nrm_keys = ['src__n_tokens', 'trg__n_tokens', 'props_between', 'n_props',
'n_shared_nouns']
vect_list = list(make_union_link(vects)) + [
('raw', FilteredDictVectorizer(raw_keys)), ('nrm', make_pipeline(
FilteredDictVectorizer(nrm_keys, sparse=False),
MinMaxScaler((0, 1))))]
vect = FeatureUnion(vect_list)
train_feats = [f for doc in train_docs for f in doc.features]
[add_pmi_features(f, pmi_incoming, pmi_outgoing) for f in train_feats]
if return_transf:
X_tr = vect.fit_transform(train_feats)
return vect, X_tr
else:
return vect.fit(train_feats)
def fit_logreg(self):
tokenize_sense = CachedFitTransform(Pipeline([
('tokenize', Map(compose(tokenize, normalize_special, unescape))),
('normalize', MapTokens(normalize_elongations)),
]), self.memory)
features = FeatureUnion([
# ('w2v_doc', ToCorporas(Pipeline([
# ('tokenize', MapCorporas(tokenize_sense)),
# ('feature', MergeSliceCorporas(Doc2VecTransform(CachedFitTransform(Doc2Vec(
# dm=0, dbow_words=1, size=100, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20,
# workers=16
# ), self.memory)))),
# ]).fit([self.train_docs, self.unsup_docs[:10**6], self.val_docs, self.test_docs]))),
# ('w2v_word_avg', Pipeline([
# ('tokenize', tokenize_sense),
# ('feature', Word2VecAverage(CachedFitTransform(Word2Vec(
# sg=1, size=100, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
# ), self.memory))),
# ]).fit(self.unsup_docs[:10**6])),
# ('w2v_word_avg_google', Pipeline([
# ('tokenize', tokenize_sense),
# ('feature', Word2VecAverage(joblib.load('data/google/GoogleNews-vectors-negative300.pickle'))),
# ])),
# ('w2v_word_norm_avg', Pipeline([
# ('tokenize', tokenize_sense),
# ('feature', Word2VecNormAverage(CachedFitTransform(Word2Vec(
# sg=1, size=100, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
# ), self.memory))),
# ]).fit(self.unsup_docs[:10**6])),
('w2v_word_norm_avg_google', Pipeline([
('tokenize', tokenize_sense),
('feature', Word2VecNormAverage(joblib.load('data/google/GoogleNews-vectors-negative300.pickle'))),
])),
# ('w2v_word_max', Pipeline([
# ('tokenize', tokenize_sense),
# ('feature', Word2VecMax(CachedFitTransform(Word2Vec(
# sg=1, size=100, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
# ), self.memory))),
# ]).fit(self.unsup_docs[:10**6])),
# ('w2v_word_max_google', Pipeline([
# ('tokenize', tokenize_sense),
# ('feature', Word2VecMax(joblib.load('data/google/GoogleNews-vectors-negative300.pickle'))),
# ])),
# ('w2v_word_inv', ToCorporas(Pipeline([
# ('tokenize', MapCorporas(tokenize_sense)),
# ('feature', MergeSliceCorporas(Word2VecInverse(CachedFitTransform(Word2Vec(
# sg=1, size=100, window=10, hs=0, negative=5, sample=0, min_count=1, iter=20, workers=16
# ), self.memory)))),
# ]).fit([self.train_docs, self.unsup_docs[:10**5], self.val_docs, self.test_docs]))),
])
classifier = LogisticRegression()
with temp_log_level({'gensim.models.word2vec': logging.INFO}):
classifier.fit(features.transform(self.train_docs), self.train_labels())
estimator = Pipeline([('features', features), ('classifier', classifier)])
return 'logreg({})'.format(','.join(name for name, _ in features.transformer_list)), estimator
def test_pipeline_feature_union():
iris = load_iris()
X, y = iris.data, iris.target
pca = PCA(random_state=0)
kbest = SelectKBest()
empty_union = FeatureUnion([('first', None), ('second', None)])
empty_pipeline = Pipeline([('first', None), ('second', None)])
scaling = Pipeline([('transform', ScalingTransformer())])
svc = SVC(kernel='linear', random_state=0)
pipe = Pipeline([('empty_pipeline', empty_pipeline),
('scaling', scaling),
('missing', None),
('union', FeatureUnion([('pca', pca),
('missing', None),
('kbest', kbest),
('empty_union', empty_union)],
transformer_weights={'pca': 0.5})),
('svc', svc)])
param_grid = dict(scaling__transform__factor=[1, 2],
union__pca__n_components=[1, 2, 3],
union__kbest__k=[1, 2],
svc__C=[0.1, 1, 10])
gs = GridSearchCV(pipe, param_grid=param_grid)
gs.fit(X, y)
dgs = dcv.GridSearchCV(pipe, param_grid=param_grid, scheduler='sync')
dgs.fit(X, y)
# Check best params match
assert gs.best_params_ == dgs.best_params_
# Check PCA components match
sk_pca = gs.best_estimator_.named_steps['union'].transformer_list[0][1]
dk_pca = dgs.best_estimator_.named_steps['union'].transformer_list[0][1]
np.testing.assert_allclose(sk_pca.components_, dk_pca.components_)
# Check SelectKBest scores match
sk_kbest = gs.best_estimator_.named_steps['union'].transformer_list[2][1]
dk_kbest = dgs.best_estimator_.named_steps['union'].transformer_list[2][1]
np.testing.assert_allclose(sk_kbest.scores_, dk_kbest.scores_)
# Check SVC coefs match
np.testing.assert_allclose(gs.best_estimator_.named_steps['svc'].coef_,
dgs.best_estimator_.named_steps['svc'].coef_)
def test_feature_union_parallel():
# test that n_jobs work for FeatureUnion
X = JUNK_FOOD_DOCS
fs = FeatureUnion([
("words", CountVectorizer(analyzer='word')),
("chars", CountVectorizer(analyzer='char')),
])
fs_parallel = FeatureUnion([
("words", CountVectorizer(analyzer='word')),
("chars", CountVectorizer(analyzer='char')),
], n_jobs=2)
fs_parallel2 = FeatureUnion([
("words", CountVectorizer(analyzer='word')),
("chars", CountVectorizer(analyzer='char')),
], n_jobs=2)
fs.fit(X)
X_transformed = fs.transform(X)
assert_equal(X_transformed.shape[0], len(X))
fs_parallel.fit(X)
X_transformed_parallel = fs_parallel.transform(X)
assert_equal(X_transformed.shape, X_transformed_parallel.shape)
assert_array_equal(
X_transformed.toarray(),
X_transformed_parallel.toarray()
)
# fit_transform should behave the same
X_transformed_parallel2 = fs_parallel2.fit_transform(X)
assert_array_equal(
X_transformed.toarray(),
X_transformed_parallel2.toarray()
)
# transformers should stay fit after fit_transform
X_transformed_parallel2 = fs_parallel2.transform(X)
assert_array_equal(
X_transformed.toarray(),
X_transformed_parallel2.toarray()
)