def test_get_auto_step_size():
X = np.array([[1, 2, 3], [2, 3, 4], [2, 3, 2]], dtype=np.float64)
alpha = 1.2
fit_intercept = False
# sum the squares of the second sample because that's the largest
max_squared_sum = 4 + 9 + 16
max_squared_sum_ = row_norms(X, squared=True).max()
assert_almost_equal(max_squared_sum, max_squared_sum_, decimal=4)
for fit_intercept in (True, False):
step_size_sqr = 1.0 / (max_squared_sum + alpha + int(fit_intercept))
step_size_log = 4.0 / (max_squared_sum + 4.0 * alpha +
int(fit_intercept))
step_size_sqr_ = get_auto_step_size(max_squared_sum_, alpha, "squared",
fit_intercept)
step_size_log_ = get_auto_step_size(max_squared_sum_, alpha, "log",
fit_intercept)
assert_almost_equal(step_size_sqr, step_size_sqr_, decimal=4)
assert_almost_equal(step_size_log, step_size_log_, decimal=4)
msg = 'Unknown loss function for SAG solver, got wrong instead of'
assert_raise_message(ValueError, msg, get_auto_step_size,
max_squared_sum_, alpha, "wrong", fit_intercept)
python类row_norms()的实例源码
def row_norms(X, squared=False):
if isinstance(X, np.ndarray):
return skm.row_norms(X, squared=squared)
return X.map_blocks(skm.row_norms, chunks=(X.chunks[0],),
drop_axis=1, squared=squared)
def _global_clustering(self, X=None):
"""
Global clustering for the subclusters obtained after fitting
"""
clusterer = self.n_clusters
centroids = self.subcluster_centers_
compute_labels = (X is not None) and self.compute_labels
# Preprocessing for the global clustering.
not_enough_centroids = False
if isinstance(clusterer, int):
clusterer = AgglomerativeClustering(
n_clusters=self.n_clusters)
# There is no need to perform the global clustering step.
if len(centroids) < self.n_clusters:
not_enough_centroids = True
elif (clusterer is not None and not
hasattr(clusterer, 'fit_predict')):
raise ValueError("n_clusters should be an instance of "
"ClusterMixin or an int")
# To use in predict to avoid recalculation.
self._subcluster_norms = row_norms(
self.subcluster_centers_, squared=True)
if clusterer is None or not_enough_centroids:
self.subcluster_labels_ = np.arange(len(centroids))
if not_enough_centroids:
warnings.warn(
"Number of subclusters found (%d) by Birch is less "
"than (%d). Decrease the threshold."
% (len(centroids), self.n_clusters))
else:
# The global clustering step that clusters the subclusters of
# the leaves. It assumes the centroids of the subclusters as
# samples and finds the final centroids.
self.subcluster_labels_ = clusterer.fit_predict(
self.subcluster_centers_)
if compute_labels:
self.labels_ = self.predict(X)
def test_row_norms():
X = np.random.RandomState(42).randn(100, 100)
sq_norm = (X ** 2).sum(axis=1)
assert_array_almost_equal(sq_norm, row_norms(X, squared=True), 5)
assert_array_almost_equal(np.sqrt(sq_norm), row_norms(X))
Xcsr = sparse.csr_matrix(X, dtype=np.float32)
assert_array_almost_equal(sq_norm, row_norms(Xcsr, squared=True), 5)
assert_array_almost_equal(np.sqrt(sq_norm), row_norms(Xcsr))
def test_labels_assignment_and_inertia():
# pure numpy implementation as easily auditable reference gold
# implementation
rng = np.random.RandomState(42)
noisy_centers = centers + rng.normal(size=centers.shape)
labels_gold = - np.ones(n_samples, dtype=np.int)
mindist = np.empty(n_samples)
mindist.fill(np.infty)
for center_id in range(n_clusters):
dist = np.sum((X - noisy_centers[center_id]) ** 2, axis=1)
labels_gold[dist < mindist] = center_id
mindist = np.minimum(dist, mindist)
inertia_gold = mindist.sum()
assert_true((mindist >= 0.0).all())
assert_true((labels_gold != -1).all())
# perform label assignment using the dense array input
x_squared_norms = (X ** 2).sum(axis=1)
labels_array, inertia_array = _labels_inertia(
X, x_squared_norms, noisy_centers)
assert_array_almost_equal(inertia_array, inertia_gold)
assert_array_equal(labels_array, labels_gold)
# perform label assignment using the sparse CSR input
x_squared_norms_from_csr = row_norms(X_csr, squared=True)
labels_csr, inertia_csr = _labels_inertia(
X_csr, x_squared_norms_from_csr, noisy_centers)
assert_array_almost_equal(inertia_csr, inertia_gold)
assert_array_equal(labels_csr, labels_gold)
def fit(self, X, y):
"""Fit factorization machine to training data.
Parameters
----------
X : array-like or sparse, shape = [n_samples, n_features]
Training vectors, where n_samples is the number of samples
and n_features is the number of features.
y : array-like, shape = [n_samples]
Target values.
Returns
-------
self : Estimator
Returns self.
"""
if self.degree > 3:
raise ValueError("FMs with degree >3 not yet supported.")
X, y = self._check_X_y(X, y)
X = self._augment(X)
n_features = X.shape[1] # augmented
X_col_norms = row_norms(X.T, squared=True)
dataset = get_dataset(X, order="fortran")
rng = check_random_state(self.random_state)
loss_obj = self._get_loss(self.loss)
if not (self.warm_start and hasattr(self, 'w_')):
self.w_ = np.zeros(n_features, dtype=np.double)
if self.fit_lower == 'explicit':
n_orders = self.degree - 1
else:
n_orders = 1
if not (self.warm_start and hasattr(self, 'P_')):
self.P_ = 0.01 * rng.randn(n_orders, self.n_components, n_features)
if not (self.warm_start and hasattr(self, 'lams_')):
if self.init_lambdas == 'ones':
self.lams_ = np.ones(self.n_components)
elif self.init_lambdas == 'random_signs':
self.lams_ = np.sign(rng.randn(self.n_components))
else:
raise ValueError("Lambdas must be initialized as ones "
"(init_lambdas='ones') or as random "
"+/- 1 (init_lambdas='random_signs').")
y_pred = self._get_output(X)
converged, self.n_iter_ = _cd_direct_ho(
self.P_, self.w_, dataset, X_col_norms, y, y_pred,
self.lams_, self.degree, self.alpha, self.beta, self.fit_linear,
self.fit_lower == 'explicit', loss_obj, self.max_iter,
self.tol, self.verbose)
if not converged:
warnings.warn("Objective did not converge. Increase max_iter.")
return self