def test_feature_union_fit_failure():
X, y = make_classification(n_samples=100, n_features=10, random_state=0)
pipe = Pipeline([('union', FeatureUnion([('good', MockClassifier()),
('bad', FailingClassifier())],
transformer_weights={'bad': 0.5})),
('clf', MockClassifier())])
grid = {'union__bad__parameter': [0, 1, 2]}
gs = dcv.GridSearchCV(pipe, grid, refit=False, scoring=None)
# Check that failure raises if error_score is `'raise'`
with pytest.raises(ValueError):
gs.fit(X, y)
# Check that grid scores were set to error_score on failure
gs.error_score = float('nan')
with pytest.warns(FitFailedWarning):
gs.fit(X, y)
check_scores_all_nan(gs, 'union__bad__parameter')
python类Pipeline()的实例源码
def computeNeighboursScores(self):
all_instances = self.iteration.datasets.instances
# Connectivity matrix
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', NearestNeighbors(self.num_neighbours, n_jobs = -1))])
pipeline.fit(all_instances.getFeatures())
# Labels
labels = np.array([generateLabel(x) for x in all_instances.getLabels()])
# Compute neighbour scores
scores = []
all_neighbours = pipeline.named_steps['model'].kneighbors(return_distance = False)
for i, label in enumerate(labels):
if label != 0:
continue
else:
neighbours = all_neighbours[i]
score = sum(labels[neighbours] + 1) / (2.0 * self.num_neighbours)
scores.append(score)
return np.array(scores)
def __init__(self, a_clf=None, a_grid_search=False):
"""Class constructor.
Args:
a_clf (classifier or None):
classifier to use or None for default
a_grid_search (bool): use grid search for estimating
hyper-parameters
"""
classifier = a_clf
self._gs = a_grid_search
if a_clf is None:
classifier = XGBClassifier(max_depth=MAX_DEPTH,
n_estimators=NTREES,
learning_rate=ALPHA,
objective="multi:softprob")
self._clf = classifier
# latest version of XGBoost cannot deal with non-sparse feature vectors
self._model = Pipeline([("vect", DictVectorizer()),
("clf", classifier)])
def nbsvm(base_clf, fit_scaler=None, transform_scaler='bin', multi_class='ovr'):
"""
NB-SVM classifier: pipeline of MNBScaler+base_clf wrapped in OneVsRestClassifier / OneVsOneClassifier to support
multiclass (MNBScaler supports only binary problems itself!).
:param base_clf: classifier to use after MNBScaler, LogisticRegression or LinearSVC are usually used
:param fit_scaler: look at MNBScaler class
:param transform_scaler: look at MNBScaler class
:param multi_class: ovr for OneVsRestClassifier, ovo for OneVsOneClassifier
:return: OneVsRestClassifier / OneVsOneClassifier
"""
mnb_scaler = MNBScaler(fit_scaler=fit_scaler, transform_scaler=transform_scaler)
pipe = Pipeline([('mnbscaler', mnb_scaler), ('clf', base_clf)])
if multi_class=='ovr':
return OneVsRestClassifier(pipe)
elif multi_class=='ovo':
return OneVsOneClassifier(pipe)
else:
raise ValueError('Unsuppported multi_class=%s, should be one of %r.' % (multi_class, ['ovr','ovo']))
def test_model_detection(self):
sklearn_model = LogisticRegression()
pipeline_model = Pipeline([('log', sklearn_model)])
xgb_model = XGBClassifier()
nn_model = NNModel(100,10)
sklearn_opt = Optimizer(sklearn_model,[], lambda x: x)
pipeline_opt = Optimizer(pipeline_model,[], lambda x: x)
xgb_opt = Optimizer(xgb_model,[], lambda x: x)
nn_opt = Optimizer(nn_model,[], lambda x: x)
self.assertEqual(sklearn_opt.model_module, 'sklearn')
self.assertEqual(pipeline_opt.model_module, 'pipeline')
self.assertEqual(xgb_opt.model_module, 'xgboost')
self.assertEqual(nn_opt.model_module, 'keras')
def train(self, train_size=0.8, k_folds=5):
# retrieve data from DB and pre-process
self._get_data()
# perform train/test split
self._get_train_test_split(train_size=train_size)
# define text pre-processing pipeline
text_pipeline = Pipeline([
('extract_text', DFColumnExtractor(TEXT_FEATURES)),
('vect', TfidfVectorizer(tokenizer=twitter_tokenizer))
])
# define pipeline for pre-processing of numeric features
numeric_pipeline = Pipeline([
('extract_nums', DFColumnExtractor(NON_TEXT_FEATURES)),
('scaler', MinMaxScaler())
])
# combine both steps into a single pipeline
pipeline = Pipeline([
('features', FeatureUnion([
('text_processing', text_pipeline),
('num_processing', numeric_pipeline)
])),
('clf', self._estimator)
])
self.logger.info('Fitting model hyperparameters with {0}-fold CV'.format(k_folds))
gs = GridSearchCV(pipeline, self.params, n_jobs=-1, cv=k_folds)
X = self.data.iloc[self.train_inds_, :]
y = self.data[LABEL].values[self.train_inds_]
gs.fit(X, y)
self.logger.info('Validation set accuracy is {0}'.format(gs.best_score_))
self.gs_ = gs
self.model_ = gs.best_estimator_
def test_cutoff_inside_a_pipeline(data):
minmax_scaler = preprocessing.MinMaxScaler()
dsapp_cutoff = CutOff()
pipeline =Pipeline([
('minmax_scaler',minmax_scaler),
('dsapp_cutoff', dsapp_cutoff)
])
pipeline.fit(data['X_train'], data['y_train'])
X_fake_new_data = data['X_test'][-1,:].reshape(1,-1) + 0.5
mms = preprocessing.MinMaxScaler().fit(data['X_train'])
assert np.all(( mms.transform(X_fake_new_data) > 1 ) == (pipeline.transform(X_fake_new_data) == 1))
def test_dsapp_lr(data):
dsapp_lr = ScaledLogisticRegression()
dsapp_lr.fit(data['X_train'], data['y_train'])
minmax_scaler = preprocessing.MinMaxScaler()
dsapp_cutoff = CutOff()
lr = linear_model.LogisticRegression()
pipeline =Pipeline([
('minmax_scaler',minmax_scaler),
('dsapp_cutoff', dsapp_cutoff),
('lr', lr)
])
pipeline.fit(data['X_train'], data['y_train'])
assert np.all(dsapp_lr.predict(data['X_test']) == pipeline.predict(data['X_test']))
def test_pipeline(get_models, get_transform, get_kernel):
alg, model = get_models
trans = get_transform()
kernel = get_kernel() + WhiteKernel()
pipe = Pipeline(steps=[(alg, model())])
param_dict = {}
if hasattr(model(), 'n_estimators'):
param_dict[alg + '__n_estimators'] = [5]
if hasattr(model(), 'kernel'):
param_dict[alg + '__kernel'] = [kernel]
param_dict[alg + '__target_transform'] = [trans]
estimator = GridSearchCV(pipe,
param_dict,
n_jobs=1,
iid=False,
pre_dispatch=2,
verbose=True,
)
np.random.seed(10)
estimator.fit(X=1 + np.random.rand(10, 3), y=1. + np.random.rand(10))
assert estimator.cv_results_['mean_train_score'][0] > -15.0
def test_svr_pipeline(get_transform, get_svr_kernel):
trans = get_transform()
pipe = Pipeline(steps=[('svr', svr())])
param_dict = {'svr__kernel': [get_svr_kernel]}
param_dict['svr__target_transform'] = [trans]
estimator = GridSearchCV(pipe,
param_dict,
n_jobs=1,
iid=False,
pre_dispatch=2,
verbose=True,
)
np.random.seed(1)
estimator.fit(X=1 + np.random.rand(10, 5), y=1. + np.random.rand(10))
assert estimator.cv_results_['mean_train_score'][0] > -10.0
def test_krige_pipeline(get_krige_method, get_variogram_model):
pipe = Pipeline(steps=[('krige', Krige(method=get_krige_method))])
param_dict = {'krige__variogram_model': [get_variogram_model]}
estimator = GridSearchCV(pipe,
param_dict,
n_jobs=1,
iid=False,
pre_dispatch=2,
verbose=True
)
np.random.seed(1)
X = np.random.randint(0, 400, size=(20, 2)).astype(float)
y = 5*np.random.rand(20)
estimator.fit(X=X, y=y)
assert estimator.cv_results_['mean_train_score'][0] > -1.0
def test_build_param_grid_set_estimator():
clf1 = SVC()
clf2 = LogisticRegression()
clf3 = SVC()
clf4 = SGDClassifier()
estimator = set_grid(Pipeline([('sel', set_grid(SelectKBest(), k=[2, 3])),
('clf', None)]),
clf=[set_grid(clf1, kernel=['linear']),
clf2,
set_grid(clf3, kernel=['poly'], degree=[2, 3]),
clf4])
param_grid = [{'clf': [clf1], 'clf__kernel': ['linear'], 'sel__k': [2, 3]},
{'clf': [clf3], 'clf__kernel': ['poly'],
'clf__degree': [2, 3], 'sel__k': [2, 3]},
{'clf': [clf2, clf4], 'sel__k': [2, 3]}]
assert build_param_grid(estimator) == param_grid
def get_binary(self):
return Pipeline([
('tfidf', TfidfVectorizer(stop_words=sw.words('dutch'), norm='l2', use_idf=True)),
('feat_select', SelectPercentile(percentile=10)),
('clf', OneVsRestClassifier(SGDClassifier(alpha=0.0001,
average=False,
class_weight=None,
epsilon=0.1,
eta0=0.0,
fit_intercept=True,
l1_ratio=0.15,
learning_rate='optimal',
loss='log',
n_iter=10,
n_jobs=1,
penalty='l2',
power_t=0.5,
random_state=None,
shuffle=True,
verbose=0,
warm_start=False
)))
])
def get_sgdc(self):
return Pipeline([
('tfidf', TfidfVectorizer(stop_words=sw.words('dutch'), norm='l2', use_idf=True)),
('feat_select', SelectPercentile(percentile=10)),
('clf', SGDClassifier(alpha=0.0001,
average=False,
class_weight=None,
epsilon=0.1,
eta0=0.0,
fit_intercept=True,
l1_ratio=0.15,
learning_rate='optimal',
loss='log',
n_iter=10,
n_jobs=1,
penalty='l2',
power_t=0.5,
random_state=None,
shuffle=True,
verbose=0,
warm_start=False))
])
def custom_fnames(union):
feature_names = []
for name, trans, weight in union._iter():
if hasattr(trans, 'get_feature_names'):
this_fn = trans.get_feature_names()
elif isinstance(trans, Pipeline):
# we use pipelines to scale only specific attributes.
# In this case, the vectorizer is first in the pipe.
this_fn = trans.steps[0][-1].get_feature_names()
else:
raise AttributeError("Transformer %s (type %s) does not "
"provide get_feature_names." % (
str(name), type(trans).__name__))
feature_names.extend([name + "__" + f for f in this_fn])
return feature_names
def createPipeline(self):
self.pipeline = Pipeline([
('model', GradientBoostingClassifier(
loss = self.conf.loss,
learning_rate = self.conf.learning_rate,
n_estimators = self.conf.n_estimators,
criterion = self.conf.criterion,
max_depth = self.conf.max_depth,
min_samples_split = self.conf.min_samples_split,
min_samples_leaf = self.conf.min_samples_leaf,
min_weight_fraction_leaf = self.conf.min_weight_fraction_leaf,
subsample = self.conf.subsample,
max_features = self.conf.max_features,
max_leaf_nodes = self.conf.max_leaf_nodes,
min_impurity_split = self.conf.min_impurity_decrease,
presort = self.conf.presort))])
def gridSearch(data, params, true_k):
tfidf = TfidfVectorizer(strip_accents=None,
lowercase=True,
sublinear_tf=True,
analyzer='word')
lr_tfidf = Pipeline([('vect', tfidf),
('clf', KMeans(init='k-means++',
n_jobs=-1,
random_state=0,
verbose=0))])
gsTfIdf = GridSearchCV(
lr_tfidf, params, n_jobs=1, verbose=1)
gsTfIdf.fit(data)
print()
print("Best score: %0.3f" % gsTfIdf.best_score_)
print("Best parameters set:")
best_parameters = gsTfIdf.best_estimator_.get_params()
for param_name in sorted(params.keys()):
print("\t%s: %r" % (param_name, best_parameters[param_name]))
def __init__(self, a_clf=None, a_grid_search=False):
"""Class constructor.
Initialize classifier.
Args:
a_clf (classifier or None):
classifier to use or None for default
a_grid_search (bool): use grid search for estimating hyper-parameters
"""
classifier = a_clf or LinearSVC(C=DFLT_C,
**DFLT_PARAMS)
self._gs = a_grid_search
self._model = Pipeline([("vect", DictVectorizer()),
("clf", classifier)])
def model_from_pipeline(pipe):
'''
Extract the model from the last stage of a pipeline.
Parameters
----------
pipe : Pipeline or Estimator
Returns
-------
model: Estimator
'''
if isinstance(pipe, Pipeline):
return pipe[-1][1]
else:
return pipe
def _execute(self, sources, alignment_stream, interval):
time_interval = TimeInterval(MIN_DATE, interval.end)
param_doc = sources[0].window(time_interval, force_calculation=True).last()
if param_doc is None:
logging.debug("No model found in {} for time interval {}".format(sources[0].stream_id, time_interval))
return
steps = deserialise_json_pipeline({
'vectorisation': DictVectorizer(sparse=False),
'fill_missing': FillZeros(),
'classifier': LinearDiscriminantAnalysis(),
'label_encoder': LabelEncoder()
}, param_doc.value)
clf = Pipeline([(kk, steps[kk]) for kk in ('vectorisation', 'fill_missing', 'classifier')])
locations = steps['label_encoder'].classes_
data = sources[1].window(interval, force_calculation=True)
for tt, dd in data:
yield StreamInstance(tt, {locations[ii]: pp for ii, pp in enumerate(clf.predict_proba(dd)[0])})
def test_ea_search_sklearn_elm_steps(label, do_predict):
'''Test that EaSearchCV can work with numpy, dask.array,
pandas.DataFrame, xarray.Dataset, xarray_filters.MLDataset
'''
from scipy.stats import lognorm
est, make_data, sel, kw = args[label]
parameters = {'kernel': ['linear', 'rbf'],
'C': lognorm(4),}
if isinstance(est, (sk_Pipeline, Pipeline)):
parameters = {'est__{}'.format(k): v
for k, v in parameters.items()}
ea = EaSearchCV(est, parameters,
n_iter=4,
ngen=2,
model_selection=sel,
model_selection_kwargs=kw)
X, y = make_data()
ea.fit(X, y)
if do_predict:
pred = ea.predict(X)
assert isinstance(pred, type(y))
def create_pipeline(estimator, reduction=False):
steps = [
('normalize', TextNormalizer()),
('vectorize', TfidfVectorizer(
tokenizer=identity, preprocessor=None, lowercase=False
))
]
if reduction:
steps.append((
'reduction', TruncatedSVD(n_components=10000)
))
# Add the estimator
steps.append(('classifier', estimator))
return Pipeline(steps)
def _fit_embedding_word(self, embedding_type, construct_docs, tokenize_, d=None):
if embedding_type == 'google':
embeddings_ = joblib.load('data/google/GoogleNews-vectors-negative300.pickle')
embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
elif embedding_type == 'twitter':
estimator = Pipeline([
('tokenize', MapCorporas(tokenize_)),
('word2vec', MergeSliceCorporas(CachedFitTransform(Word2Vec(
sg=1, size=d, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
), self.memory))),
]).fit([self.train_docs, self.unsup_docs[:10**6], self.val_docs, self.test_docs])
embeddings_ = estimator.named_steps['word2vec'].estimator
embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
else:
embeddings_ = SimpleNamespace(X=np.empty((0, d)), vocab={})
estimator = Pipeline([
('tokenize', MapCorporas(tokenize_)),
# 0.25 is chosen so the unknown vectors have approximately the same variance as google pre-trained ones
('embeddings', MapCorporas(Embeddings(
embeddings_, rand=lambda shape: get_rng().uniform(-0.25, 0.25, shape).astype('float32'),
include_zero=True
))),
])
estimator.fit(construct_docs)
return estimator.named_steps['embeddings'].estimator
def test_feature_union_fit_failure_multiple_metrics():
scoring = {"score_1": _passthrough_scorer, "score_2": _passthrough_scorer}
X, y = make_classification(n_samples=100, n_features=10, random_state=0)
pipe = Pipeline([('union', FeatureUnion([('good', MockClassifier()),
('bad', FailingClassifier())],
transformer_weights={'bad': 0.5})),
('clf', MockClassifier())])
grid = {'union__bad__parameter': [0, 1, 2]}
gs = dcv.GridSearchCV(pipe, grid, refit=False, scoring=scoring)
# Check that failure raises if error_score is `'raise'`
with pytest.raises(ValueError):
gs.fit(X, y)
# Check that grid scores were set to error_score on failure
gs.error_score = float('nan')
with pytest.warns(FitFailedWarning):
gs.fit(X, y)
for key in scoring:
check_scores_all_nan(gs, 'union__bad__parameter', score_key=key)
def test_feature_union_raises():
X, y = make_classification(n_samples=100, n_features=10, random_state=0)
union = FeatureUnion([('tr0', MockClassifier()),
('tr1', MockClassifier())])
pipe = Pipeline([('union', union), ('est', MockClassifier())])
grid = {'union__tr2__parameter': [0, 1, 2]}
gs = dcv.GridSearchCV(pipe, grid, refit=False)
with pytest.raises(ValueError):
gs.fit(X, y)
grid = {'union__transformer_list': [[('one', MockClassifier())]]}
gs = dcv.GridSearchCV(pipe, grid, refit=False)
with pytest.raises(NotImplementedError):
gs.fit(X, y)
def test_hyperparameter_searcher_with_fit_params(cls, kwargs):
X = np.arange(100).reshape(10, 10)
y = np.array([0] * 5 + [1] * 5)
clf = CheckingClassifier(expected_fit_params=['spam', 'eggs'])
pipe = Pipeline([('clf', clf)])
searcher = cls(pipe, {'clf__foo_param': [1, 2, 3]}, cv=2, **kwargs)
# The CheckingClassifer generates an assertion error if
# a parameter is missing or has length != len(X).
with pytest.raises(AssertionError) as exc:
searcher.fit(X, y, clf__spam=np.ones(10))
assert "Expected fit parameter(s) ['eggs'] not seen." in str(exc.value)
searcher.fit(X, y, clf__spam=np.ones(10), clf__eggs=np.zeros(10))
# Test with dask objects as parameters
searcher.fit(X, y, clf__spam=da.ones(10, chunks=2),
clf__eggs=dask.delayed(np.zeros(10)))
def test_boston_OHE_plus_normalizer(self):
data = load_boston()
pl = Pipeline([
("OHE", OneHotEncoder(categorical_features = [8], sparse=False)),
("Scaler",StandardScaler())])
pl.fit(data.data, data.target)
# Convert the model
spec = convert(pl, data.feature_names, 'out')
input_data = [dict(zip(data.feature_names, row)) for row in data.data]
output_data = [{"out" : row} for row in pl.transform(data.data)]
result = evaluate_transformer(spec, input_data, output_data)
assert result["num_errors"] == 0
def test_boston_OHE_plus_trees(self):
data = load_boston()
pl = Pipeline([
("OHE", OneHotEncoder(categorical_features = [8], sparse=False)),
("Trees",GradientBoostingRegressor(random_state = 1))])
pl.fit(data.data, data.target)
# Convert the model
spec = convert(pl, data.feature_names, 'target')
# Get predictions
df = pd.DataFrame(data.data, columns=data.feature_names)
df['prediction'] = pl.predict(data.data)
# Evaluate it
result = evaluate_regressor(spec, df, 'target', verbose = False)
assert result["max_error"] < 0.0001
def test_boston_OHE_pipeline(self):
data = load_boston()
for categorical_features in [ [3], [8], [3, 8], [8,3] ]:
# Put it in a pipeline so that we can test whether the output dimension
# handling is correct.
model = Pipeline([("OHE", OneHotEncoder(categorical_features = categorical_features)),
("Normalizer", Normalizer())])
model.fit(data.data.copy(), data.target)
# Convert the model
spec = sklearn.convert(model, data.feature_names, 'out').get_spec()
input_data = [dict(zip(data.feature_names, row)) for row in data.data]
output_data = [{"out" : row} for row in model.transform(data.data.copy())]
result = evaluate_transformer(spec, input_data, output_data)
assert result["num_errors"] == 0
def word_unigrams():
preprocessor = TextCleaner(lowercase=True,
filter_urls=True,
filter_mentions=True,
filter_hashtags=True,
alphabetic=True,
strip_accents=True,
filter_rt=True)
vectorizer = CountVectorizer(min_df=2,
stop_words=get_stopwords(),
preprocessor=preprocessor,
ngram_range=(1, 1))
pipeline = Pipeline([('vect', vectorizer),
('tfidf', TfidfTransformer(sublinear_tf=True)),
('scale', Normalizer())])
return ('word_unigrams', pipeline)