def _build_graph(self):
"""Compute the graph Laplacian."""
# Graph sparsification
if self.sparsify == 'epsilonNN':
self.A_ = radius_neighbors_graph(self.X_, self.radius, include_self=False)
else:
Q = kneighbors_graph(
self.X_,
self.n_neighbors,
include_self = False
).astype(np.bool)
if self.sparsify == 'kNN':
self.A_ = (Q + Q.T).astype(np.float64)
elif self.sparsify == 'MkNN':
self.A_ = (Q.multiply(Q.T)).astype(np.float64)
# Edge re-weighting
if self.reweight == 'rbf':
W = rbf_kernel(self.X_, gamma=self.t)
self.A_ = self.A_.multiply(W)
return sp.csgraph.laplacian(self.A_, normed=self.normed)
python类rbf_kernel()的实例源码
def _get_kernel(self, X, y):
# When adding a new kernel, update this table and the _get_kernel_map
# method
if callable(self.ovkernel):
ov_kernel = self.ovkernel
elif isinstance(self.ovkernel, str):
# 1) check string and assign the right parameters
if self.ovkernel == 'DGauss':
self.A_ = self._default_decomposable_op(y)
kernel_params = {'A': self.A_, 'scalar_kernel': rbf_kernel,
'scalar_kernel_params': {'gamma': self.gamma}}
elif self.ovkernel == 'DSkewed_chi2':
self.A_ = self._default_decomposable_op(y)
kernel_params = {'A': self.A_, 'scalar_kernel': 'skewed_chi2',
'scalar_kernel_params': {'skew': self.skew}}
elif self.ovkernel == 'CurlF':
kernel_params = {'gamma': self.gamma}
else:
raise NotImplementedError('unsupported kernel')
# 2) Uses lookup table to select the right kernel from string
ov_kernel = \
PAIRWISE_KERNEL_FUNCTIONS[self.ovkernel](**kernel_params)
else:
raise NotImplementedError('unsupported kernel')
return ov_kernel
def _get_kernel_map(self, inputs):
# When adding a new kernel, update this table and the _get_kernel_map
# method
if callable(self.kernel):
kernel_params = self.kernel_params or {}
ov_kernel = self.kernel(**kernel_params)
elif isinstance(self.kernel, str):
# 1) check string and assign the right parameters
if self.kernel == 'DGauss':
kernel_params = {'A': self._default_decomposable_op(),
'scalar_kernel': rbf_kernel,
'scalar_kernel_params': {'gamma': self.gamma}}
else:
raise NotImplementedError('unsupported kernel')
# 2) Uses lookup table to select the right kernel from string
ov_kernel = PAIRWISE_KERNEL_FUNCTIONS[self.kernel](**kernel_params)
else:
raise NotImplementedError('unsupported kernel')
return ov_kernel(inputs)
def __init__(self, A, scalar_kernel=rbf_kernel, scalar_kernel_params=None):
"""Initialize the Decomposable Operator-Valued Kernel.
Parameters
----------
A : {array, LinearOperator}, shape = [n_targets, n_targets]
Linear operator acting on the outputs
scalar_kernel : {callable}
Callable which associate to the training points X the Gram matrix.
scalar_kernel_params : {mapping of string to any}, optional
Additional parameters (keyword arguments) for kernel function
passed as callable object.
"""
self.A = A
self.scalar_kernel = scalar_kernel
self.scalar_kernel_params = scalar_kernel_params
self.p = A.shape[0]
def test_rbf_sampler():
# test that RBFSampler approximates kernel on random data
# compute exact kernel
gamma = 10.
kernel = rbf_kernel(X, Y, gamma=gamma)
# approximate kernel mapping
rbf_transform = RBFSampler(gamma=gamma, n_components=1000, random_state=42)
X_trans = rbf_transform.fit_transform(X)
Y_trans = rbf_transform.transform(Y)
kernel_approx = np.dot(X_trans, Y_trans.T)
error = kernel - kernel_approx
assert_less_equal(np.abs(np.mean(error)), 0.01) # close to unbiased
np.abs(error, out=error)
assert_less_equal(np.max(error), 0.1) # nothing too far off
assert_less_equal(np.mean(error), 0.05) # mean is fairly close
def test_spectral_embedding_unnormalized():
# Test that spectral_embedding is also processing unnormalized laplacian
# correctly
random_state = np.random.RandomState(36)
data = random_state.randn(10, 30)
sims = rbf_kernel(data)
n_components = 8
embedding_1 = spectral_embedding(sims,
norm_laplacian=False,
n_components=n_components,
drop_first=False)
# Verify using manual computation with dense eigh
laplacian, dd = graph_laplacian(sims, normed=False, return_diag=True)
_, diffusion_map = eigh(laplacian)
embedding_2 = diffusion_map.T[:n_components] * dd
embedding_2 = _deterministic_vector_sign_flip(embedding_2).T
assert_array_almost_equal(embedding_1, embedding_2)
def test_svr_predict():
# Test SVR's decision_function
# Sanity check, test that predict implemented in python
# returns the same as the one in libsvm
X = iris.data
y = iris.target
# linear kernel
reg = svm.SVR(kernel='linear', C=0.1).fit(X, y)
dec = np.dot(X, reg.coef_.T) + reg.intercept_
assert_array_almost_equal(dec.ravel(), reg.predict(X).ravel())
# rbf kernel
reg = svm.SVR(kernel='rbf', gamma=1).fit(X, y)
rbfs = rbf_kernel(X, reg.support_vectors_, gamma=reg.gamma)
dec = np.dot(rbfs, reg.dual_coef_.T) + reg.intercept_
assert_array_almost_equal(dec.ravel(), reg.predict(X).ravel())
def setKernel(self, kernel_name, kernel_param):
self.kernel_name = kernel_name
if kernel_name == 'rbf':
def rbf(x1,x2):
return rbf_kernel(x1,x2, gamma=kernel_param) # from sklearn
self.internal_kernel_func = rbf
else:
def dot_product(x1,x2):
return cosine_similarity(x1,x2) # from sklearn - a normalized version of dot product #np.dot(x1,x2.T)
self.internal_kernel_func = dot_product
LSSVMWrapper.py 文件源码
项目:PersonalizedMultitaskLearning
作者: mitmedialab
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def get_kernel_func(self,kernel_name, beta):
if kernel_name == 'rbf':
def rbf(x1,x2):
return rbf_kernel(x1,x2, gamma=beta) # from sklearn
return rbf
else:
def dot_product(x1,x2):
return np.dot(x1,x2.T)
return dot_product
def predict(self, X, Z):
"""Predict class labels for samples in X.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Samples.
Returns
-------
y : array-like, shape = [n_samples]
Predictions for input data.
"""
return rbf_kernel(X, Z, gamma=self.gamma_k) @ self.dual_coef_
def fit(self, X, y, L):
"""Fit the model according to the given training data.
Prameters
---------
X : array-like, shpae = [n_samples, n_features]
Training data.
y : array-like, shpae = [n_samples]
Target values (unlabeled points are marked as 0).
L : array-like, shpae = [n_samples, n_samples]
Graph Laplacian.
"""
labeled = y != 0
y_labeled = y[labeled]
n_samples, n_features = X.shape
n_labeled_samples = y_labeled.size
I = sp.eye(n_samples)
J = sp.diags(labeled.astype(np.float64))
K = rbf_kernel(X, gamma=self.gamma_k)
M = J @ K \
+ self.gamma_a * n_labeled_samples * I \
+ self.gamma_i * n_labeled_samples / n_samples**2 * L**self.p @ K
# Train a classifer
self.dual_coef_ = LA.solve(M, y)
return self
def __kernel_definition__(self):
if self.Kf == 'rbf':
return lambda X,Y : rbf_kernel(X,Y,self.rbf_gamma)
if self.Kf == 'poly':
return lambda X,Y : polynomial_kernel(X, Y, degree=self.poly_deg, gamma=None, coef0=self.poly_coeff)
if self.Kf == None or self.Kf == 'linear':
return lambda X,Y : linear_kernel(X,Y)
def inner(self, x, y):
gamma = 0.5 / self.sigma**2
return rbf_kernel(to2d(x), to2d(y), gamma)
def _get_kernel_map(self, X, y):
# When adding a new kernel, update this table and the _get_kernel_map
# method
if callable(self.kernel):
ov_kernel = self.kernel
elif type(self.kernel) is str:
# 1) check string and assign the right parameters
if self.kernel == 'DGauss':
self.A_ = self._default_decomposable_op(y)
kernel_params = {'A': self.A_, 'scalar_kernel': rbf_kernel,
'scalar_kernel_params': {'gamma': self.gamma}}
elif self.kernel == 'DotProduct':
kernel_params = {'mu': self.mu, 'p': y.shape[1]}
elif self.kernel == 'DPeriodic':
self.A_ = self._default_decomposable_op(y)
self.period_ = self._default_period(X, y)
kernel_params = {'A': self.A_,
'scalar_kernel': first_periodic_kernel,
'scalar_kernel_params': {'gamma': self.theta,
'period':
self.period_}, }
else:
raise NotImplemented('unsupported kernel')
# 2) Uses lookup table to select the right kernel from string
ov_kernel = PAIRWISE_KERNEL_FUNCTIONS[self.kernel](**kernel_params)
else:
raise NotImplemented('unsupported kernel')
return ov_kernel
def _get_kernel_map(self, X, y):
# When adding a new kernel, update this table and the _get_kernel_map
# method
if callable(self.ovkernel):
ovkernel = self.ovkernel
elif type(self.ovkernel) is str:
# 1) check string and assign the right parameters
if self.ovkernel == 'DGauss':
self.A_ = self._default_decomposable_op(y)
kernel_params = {'A': self.A_, 'scalar_kernel': rbf_kernel,
'scalar_kernel_params': {'gamma': self.gamma}}
elif self.ovkernel == 'DPeriodic':
self.A_ = self._default_decomposable_op(y)
self.period_ = self._default_period(X, y)
kernel_params = {'A': self.A_,
'scalar_kernel': first_periodic_kernel,
'scalar_kernel_params': {'gamma': self.theta,
'period':
self.period_}, }
elif self.ovkernel == 'CurlF':
kernel_params = {'gamma': self.gamma}
else:
raise NotImplementedError('unsupported kernel')
# 2) Uses lookup table to select the right kernel from string
ovkernel = PAIRWISE_KERNEL_FUNCTIONS[self.ovkernel](
**kernel_params)
else:
raise NotImplementedError('unsupported kernel')
return ovkernel(X)
def _Gram(self, X):
if X is self.X:
if self.Gs_train is None:
kernel_scalar = rbf_kernel(self.X, gamma=self.gamma)[:, :,
newaxis,
newaxis]
delta = subtract(X.T[:, newaxis, :], self.X.T[:, :, newaxis])
self.Gs_train = asarray(transpose(
2 * self.gamma * kernel_scalar *
(eye(self.p)[newaxis, newaxis, :, :] - 2 *
(self.gamma * delta[:, newaxis, :, :] *
delta[newaxis, :, :, :]).transpose((3, 2, 0, 1))),
(0, 2, 1, 3)
)).reshape((self.p * X.shape[0], self.p * self.X.shape[0]))
return self.Gs_train
kernel_scalar = rbf_kernel(X, self.X, gamma=self.gamma)[:, :,
newaxis,
newaxis]
delta = subtract(X.T[:, newaxis, :], self.X.T[:, :, newaxis])
return asarray(transpose(
2 * self.gamma * kernel_scalar *
(eye(self.p)[newaxis, newaxis, :, :] - 2 *
(self.gamma * delta[:, newaxis, :, :] *
delta[newaxis, :, :, :]).transpose((3, 2, 0, 1))),
(0, 2, 1, 3)
)).reshape((self.p * X.shape[0], self.p * self.X.shape[0]))
def _default_decomposable_op(self):
probs = asarray(self.probs).reshape((1, -1)) # 2D array
return (rbf_kernel(probs.T, gamma=self.gamma_quantile)
if self.gamma_quantile != npinf else eye(len(self.probs)))
def determine_num_clusters_spectral(X, max_clusters = 10, gamma = None):
""" Determine number of clusters based on Eigengaps of Graph Laplacian. """
if gamma is None:
gamma = np.sqrt(X.shape[1])
adjacency = rbf_kernel(X, gamma = gamma)
laplacian = graph_laplacian(adjacency, normed = True, return_diag = False)
eig = scipy.linalg.eigh(laplacian, eigvals = (0, min(max_clusters, laplacian.shape[0] - 1)), eigvals_only = True)
eigengap = eig[1:] - eig[:-1]
return np.argmax(eigengap) + 1
def test_nystroem_approximation():
# some basic tests
rnd = np.random.RandomState(0)
X = rnd.uniform(size=(10, 4))
# With n_components = n_samples this is exact
X_transformed = Nystroem(n_components=X.shape[0]).fit_transform(X)
K = rbf_kernel(X)
assert_array_almost_equal(np.dot(X_transformed, X_transformed.T), K)
trans = Nystroem(n_components=2, random_state=rnd)
X_transformed = trans.fit(X).transform(X)
assert_equal(X_transformed.shape, (X.shape[0], 2))
# test callable kernel
linear_kernel = lambda X, Y: np.dot(X, Y.T)
trans = Nystroem(n_components=2, kernel=linear_kernel, random_state=rnd)
X_transformed = trans.fit(X).transform(X)
assert_equal(X_transformed.shape, (X.shape[0], 2))
# test that available kernels fit and transform
kernels_available = kernel_metrics()
for kern in kernels_available:
trans = Nystroem(n_components=2, kernel=kern, random_state=rnd)
X_transformed = trans.fit(X).transform(X)
assert_equal(X_transformed.shape, (X.shape[0], 2))
def test_nystroem_singular_kernel():
# test that nystroem works with singular kernel matrix
rng = np.random.RandomState(0)
X = rng.rand(10, 20)
X = np.vstack([X] * 2) # duplicate samples
gamma = 100
N = Nystroem(gamma=gamma, n_components=X.shape[0]).fit(X)
X_transformed = N.transform(X)
K = rbf_kernel(X, gamma=gamma)
assert_array_almost_equal(K, np.dot(X_transformed, X_transformed.T))
assert_true(np.all(np.isfinite(Y)))
def callable_rbf_kernel(x, y, **kwds):
# Callable version of pairwise.rbf_kernel.
K = rbf_kernel(np.atleast_2d(x), np.atleast_2d(y), **kwds)
return K
def test_kernel_symmetry():
# Valid kernels should be symmetric
rng = np.random.RandomState(0)
X = rng.random_sample((5, 4))
for kernel in (linear_kernel, polynomial_kernel, rbf_kernel,
laplacian_kernel, sigmoid_kernel, cosine_similarity):
K = kernel(X, X)
assert_array_almost_equal(K, K.T, 15)
def test_kernel_sparse():
rng = np.random.RandomState(0)
X = rng.random_sample((5, 4))
X_sparse = csr_matrix(X)
for kernel in (linear_kernel, polynomial_kernel, rbf_kernel,
laplacian_kernel, sigmoid_kernel, cosine_similarity):
K = kernel(X, X)
K2 = kernel(X_sparse, X_sparse)
assert_array_almost_equal(K, K2)
def test_rbf_kernel():
rng = np.random.RandomState(0)
X = rng.random_sample((5, 4))
K = rbf_kernel(X, X)
# the diagonal elements of a rbf kernel are 1
assert_array_almost_equal(K.flat[::6], np.ones(5))
def test_gridsearch_pipeline_precomputed():
# Test if we can do a grid-search to find parameters to separate
# circles with a perceptron model using a precomputed kernel.
X, y = make_circles(n_samples=400, factor=.3, noise=.05,
random_state=0)
kpca = KernelPCA(kernel="precomputed", n_components=2)
pipeline = Pipeline([("kernel_pca", kpca), ("Perceptron", Perceptron())])
param_grid = dict(Perceptron__n_iter=np.arange(1, 5))
grid_search = GridSearchCV(pipeline, cv=3, param_grid=param_grid)
X_kernel = rbf_kernel(X, gamma=2.)
grid_search.fit(X_kernel, y)
assert_equal(grid_search.best_score_, 1)
def test_spectral_embedding_precomputed_affinity(seed=36):
# Test spectral embedding with precomputed kernel
gamma = 1.0
se_precomp = SpectralEmbedding(n_components=2, affinity="precomputed",
random_state=np.random.RandomState(seed))
se_rbf = SpectralEmbedding(n_components=2, affinity="rbf",
gamma=gamma,
random_state=np.random.RandomState(seed))
embed_precomp = se_precomp.fit_transform(rbf_kernel(S, gamma=gamma))
embed_rbf = se_rbf.fit_transform(S)
assert_array_almost_equal(
se_precomp.affinity_matrix_, se_rbf.affinity_matrix_)
assert_true(_check_with_col_sign_flipping(embed_precomp, embed_rbf, 0.05))
def test_spectral_embedding_deterministic():
# Test that Spectral Embedding is deterministic
random_state = np.random.RandomState(36)
data = random_state.randn(10, 30)
sims = rbf_kernel(data)
embedding_1 = spectral_embedding(sims)
embedding_2 = spectral_embedding(sims)
assert_array_almost_equal(embedding_1, embedding_2)
def test_spectral_clustering_sparse():
X, y = make_blobs(n_samples=20, random_state=0,
centers=[[1, 1], [-1, -1]], cluster_std=0.01)
S = rbf_kernel(X, gamma=1)
S = np.maximum(S - 1e-4, 0)
S = sparse.coo_matrix(S)
labels = SpectralClustering(random_state=0, n_clusters=2,
affinity='precomputed').fit(S).labels_
assert_equal(adjusted_rand_score(y, labels), 1)
def test_decision_function():
# Test decision_function
# Sanity check, test that decision_function implemented in python
# returns the same as the one in libsvm
# multi class:
clf = svm.SVC(kernel='linear', C=0.1,
decision_function_shape='ovo').fit(iris.data, iris.target)
dec = np.dot(iris.data, clf.coef_.T) + clf.intercept_
assert_array_almost_equal(dec, clf.decision_function(iris.data))
# binary:
clf.fit(X, Y)
dec = np.dot(X, clf.coef_.T) + clf.intercept_
prediction = clf.predict(X)
assert_array_almost_equal(dec.ravel(), clf.decision_function(X))
assert_array_almost_equal(
prediction,
clf.classes_[(clf.decision_function(X) > 0).astype(np.int)])
expected = np.array([-1., -0.66, -1., 0.66, 1., 1.])
assert_array_almost_equal(clf.decision_function(X), expected, 2)
# kernel binary:
clf = svm.SVC(kernel='rbf', gamma=1, decision_function_shape='ovo')
clf.fit(X, Y)
rbfs = rbf_kernel(X, clf.support_vectors_, gamma=clf.gamma)
dec = np.dot(rbfs, clf.dual_coef_.T) + clf.intercept_
assert_array_almost_equal(dec.ravel(), clf.decision_function(X))
def fit(self, X, y, L):
"""Fit the model according to the given training data.
Prameters
---------
X : array-like, shpae = [n_samples, n_features]
Training data.
y : array-like, shpae = [n_samples]
Target values (unlabeled points are marked as 0).
L : array-like, shpae = [n_samples, n_samples]
Graph Laplacian.
"""
labeled = y != 0
y_labeled = y[labeled]
n_samples, n_features = X.shape
n_labeled_samples = y_labeled.size
I = sp.eye(n_samples)
Y = sp.diags(y_labeled)
J = sp.eye(n_labeled_samples, n_samples)
K = rbf_kernel(X, gamma=self.gamma_k)
M = 2 * self.gamma_a * I \
+ 2 * self.gamma_i / n_samples**2 * L**self.p @ K
# Construct the QP, invoke solver
solvers.options['show_progress'] = False
sol = solvers.qp(
P = matrix(Y @ J @ K @ LA.inv(M) @ J.T @ Y),
q = matrix(-1 * np.ones(n_labeled_samples)),
G = matrix(np.vstack((
-1 * np.eye(n_labeled_samples),
n_labeled_samples * np.eye(n_labeled_samples)
))),
h = matrix(np.hstack((
np.zeros(n_labeled_samples),
np.ones(n_labeled_samples)
))),
A = matrix(y_labeled, (1, n_labeled_samples), 'd'),
b = matrix(0.0)
)
# Train a classifer
self.dual_coef_ = LA.solve(M, J.T @ Y @ np.array(sol['x']).ravel())
return self