def _estimate_lambda_single_y(y):
"""Estimate lambda for a single y, given a range of lambdas
through which to search. No validation performed.
Parameters
----------
y : ndarray, shape (n_samples,)
The vector being estimated against
"""
# ensure is array
y = np.array(y)
# Use scipy's log-likelihood estimator
b = boxcox(y, lmbda=None)
# Return lambda corresponding to maximum P
return b[1]
python类boxcox()的实例源码
def _fit_boxcox(self, X):
""" Transform features using a boxcox transform.
Parameters
----------
X : np.array [n_samples, n_features]
Untransformed training features.
Returns
-------
X_boxcox : np.array [n_samples, n_features]
Transformed training features.
"""
_, self.n_feats = X.shape
X_boxcox = np.zeros(X.shape)
lmbda_opt = np.zeros((self.n_feats,))
for i in range(self.n_feats):
X_boxcox[:, i], lmbda_opt[i] = boxcox(
X[:, i] + EPS
)
self.lmbda = lmbda_opt
return X_boxcox
def _transform(self, X):
""" Transform an input feature matrix using the trained boxcox
parameters.
Parameters
----------
X : np.array [n_samples, n_features]
Input features.
Returns
-------
X_boxcox : np.array [n_samples, n_features]
Transformed features.
"""
X_boxcox = np.zeros(X.shape)
for i in range(self.n_feats):
X_boxcox[:, i] = boxcox(
X[:, i] + EPS, lmbda=self.lmbda[i]
)
return X_boxcox
def preprocess_feature(self, feature, parameters):
is_not_empty = 1 - np.isclose(feature, normalization.MISSING_VALUE)
if parameters.feature_type == identify_types.BINARY:
# Binary features are always 1 unless they are 0
return ((feature != 0) * is_not_empty).astype(np.float32)
if parameters.boxcox_lambda is not None:
feature = stats.boxcox(
np.maximum(
feature + parameters.boxcox_shift,
normalization.BOX_COX_MARGIN
), parameters.boxcox_lambda
)
# No *= to ensure consistent out-of-place operation.
if parameters.feature_type == identify_types.PROBABILITY:
feature = np.clip(feature, 0.01, 0.99)
feature = special.logit(feature)
elif parameters.feature_type == identify_types.QUANTILE:
quantiles = parameters.quantiles
values = np.zeros(feature.shape)
for quantile in quantiles:
values += feature >= quantile
feature = values / float(len(quantiles))
elif parameters.feature_type == identify_types.ENUM:
possible_values = parameters.possible_values
mapping = {}
for i, possible_value in enumerate(possible_values):
mapping[possible_value] = i
output_feature = np.zeros((len(feature), len(possible_values)))
for i, val in enumerate(feature):
output_feature[i][mapping[val]] = 1.0
return output_feature
else:
feature = feature - parameters.mean
feature /= parameters.stddev
feature *= is_not_empty
return feature
def transform_features(x_train, x_test):
""" Transform features using a boxcox transform. Remove vibrato features.
Comptes the optimal value of lambda on the training set and applies this
lambda to the testing set.
Parameters
----------
x_train : np.array [n_samples, n_features]
Untransformed training features.
x_test : np.array [n_samples, n_features]
Untransformed testing features.
Returns
-------
x_train_boxcox : np.array [n_samples, n_features_trans]
Transformed training features.
x_test_boxcox : np.array [n_samples, n_features_trans]
Transformed testing features.
"""
x_train = x_train[:, 0:6]
x_test = x_test[:, 0:6]
_, n_feats = x_train.shape
x_train_boxcox = np.zeros(x_train.shape)
lmbda_opt = np.zeros((n_feats,))
eps = 1.0 # shift features away from zero
for i in range(n_feats):
x_train_boxcox[:, i], lmbda_opt[i] = boxcox(x_train[:, i] + eps)
x_test_boxcox = np.zeros(x_test.shape)
for i in range(n_feats):
x_test_boxcox[:, i] = boxcox(x_test[:, i] + eps, lmbda=lmbda_opt[i])
return x_train_boxcox, x_test_boxcox
def fit(self, X, y):
if self.is_boxcox:
self.clf.fit(X, stats.boxcox(y, self.boxcox_lambda))
else:
self.clf.fit(X, y)
def compute_loss(input_compute_loss):
Model = input_compute_loss["Model"]
config = input_compute_loss["config"]
X_train = input_compute_loss["X_train"]
y_train = input_compute_loss["y_train"]
dates_train = input_compute_loss["dates_train"]
X_test = input_compute_loss["X_test"]
y_test = input_compute_loss["y_test"]
is_y_log = input_compute_loss["is_y_log"]
is_boxcox = input_compute_loss["is_boxcox"]
loss_func = input_compute_loss["loss_func"]
model = Model(**config)
if hasattr(model ,"dates_train"):
model.dates_train = dates_train
if is_y_log:
model.fit(X_train, np.log(y_train))
predict_y_test = np.exp(model.predict(X_test))
elif is_boxcox:
model.fit(X_train, boxcox(y_train, boxcox_lambda))
predict_y_test = invboxcox(model.predict(X_test), boxcox_lambda)
else:
model.fit(X_train, y_train)
predict_y_test = model.predict(X_test)
if loss_func is None:
loss = mape_loss(y_test, predict_y_test)
else:
loss = loss_func(y_test, predict_y_test)
return (repr(config), config, loss)
def norm_y(y):
return boxcox(np.log1p(y), lmbda=norm_y_lambda)
def mungeskewed(train, test, numeric_feats):
ntrain = train.shape[0]
test['loss'] = 0
train_test = pd.concat((train, test)).reset_index(drop=True)
skewed_feats = train[numeric_feats].apply(lambda x: skew(x.dropna()))
skewed_feats = skewed_feats[skewed_feats > 0.25]
skewed_feats = skewed_feats.index
for feats in skewed_feats:
train_test[feats] = train_test[feats] + 1
train_test[feats], lam = boxcox(train_test[feats])
return train_test, ntrain
def test_preprocessing_network(self):
feature_value_map = preprocessing_util.read_data()
normalization_parameters = normalization.identify_parameters(
feature_value_map
)
test_features = self.preprocess(
feature_value_map, normalization_parameters
)
net = core.Net("PreprocessingTestNet")
preprocessor = PreprocessorNet(net, False)
for feature_name in feature_value_map:
workspace.FeedBlob(feature_name, np.array([0], dtype=np.int32))
preprocessor.preprocess_blob(
feature_name, normalization_parameters[feature_name]
)
workspace.CreateNet(net)
for feature_name in feature_value_map:
workspace.FeedBlob(feature_name, feature_value_map[feature_name])
workspace.RunNetOnce(net)
for feature_name in feature_value_map:
normalized_features = workspace.FetchBlob(
feature_name + "_preprocessed"
)
tolerance = 0.01
if feature_name == 'boxcox':
# At the limit, boxcox has some numerical instability
tolerance = 0.1
non_matching = np.where(
np.logical_not(
np.isclose(
normalized_features,
test_features[feature_name],
rtol=tolerance,
atol=tolerance,
)
)
)
self.assertTrue(
np.all(
np.isclose(
normalized_features,
test_features[feature_name],
rtol=tolerance,
atol=tolerance,
)
), '{} does not match: {} {}'.format(
feature_name, normalized_features[non_matching].tolist(),
test_features[feature_name][non_matching].tolist()
)
)