def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
python类pinv()的实例源码
def tdoa_to_position(time_diff, sensor_pos):
sensors = len(time_diff)
if len(time_diff) != len(sensor_pos):
raise Exception('Channel number mismatch.')
dist_diff = []
for x in time_diff:
dist_diff.append(x * sound_speed)
inhom_mat = np.mat(np.zeros([sensors - 2, 1]))
coeff_mat = np.mat(np.zeros([sensors - 2, 3]))
for i in range(2, sensors):
args = dist_diff[1], dist_diff[i], \
sensor_pos[0], sensor_pos[1], sensor_pos[i]
coeff_mat[i - 2, :] = coeff(*args)
inhom_mat[i - 2] = -inhom(*args)
x_sol = lin.pinv(coeff_mat) * inhom_mat
return x_sol[0, 0], x_sol[1, 0], x_sol[2, 0]
def _update(self):
D = self.A.shape[1]
eta = 0.5 /np.float(D) # to tune
A_t = self.A.copy()
if self.A_true.any():
self.show_error()
sys.stdout.flush()
start = time.time()
A_inv = pinv(self.A)
Z = threshold(A_inv * self.Y, threshmin = self.alpha)
for i in range(self.inner_epo):
A_t = A_t + eta * (self.Y * Z.transpose() - A_t * Z * Z.transpose())
end = time.time()
self.time = self.time + end - start
return A_t
def _fit(self, X, y):
# Calling the class-specific train method
n, d = X.shape
if n < d:
tmp = np.dot(X, X.T)
if self.mu != 0.0:
tmp += self.mu * n * np.eye(n)
tmp = la.pinv(tmp)
coef_ = np.dot(np.dot(X.T, tmp), y)
else:
tmp = np.dot(X.T, X)
if self.mu != 0.0:
tmp += self.mu * n * np.eye(d)
tmp = la.pinv(tmp)
coef_ = np.dot(tmp, np.dot(X.T, y))
self.coef_ = coef_
def train(self):
X = self.train_x
y = self.train_y
# include intercept
beta = np.zeros((self.p+1, 1))
iter_times = 0
while True:
e_X = np.exp(X @ beta)
# N x 1
self.P = e_X / (1 + e_X)
# W is a vector
self.W = (self.P * (1 - self.P)).flatten()
# X.T * W equal (X.T @ diagflat(W)).diagonal()
beta = beta + self.math.pinv((X.T * self.W) @ X) @ X.T @ (y - self.P)
iter_times += 1
if iter_times > self.max_iter:
break
self.beta_hat = beta
test_linalg.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
def __init__(self, x, y, z, params, debiased=False, kappa=1):
if not (x.shape[0] == y.shape[0] == z.shape[0]):
raise ValueError('x, y and z must have the same number of rows')
if not x.shape[1] == len(params):
raise ValueError('x and params must have compatible dimensions')
self.x = x
self.y = y
self.z = z
self.params = params
self._debiased = debiased
self.eps = y - x @ params
self._kappa = kappa
self._pinvz = pinv(z)
nobs, nvar = x.shape
self._scale = nobs / (nobs - nvar) if self._debiased else 1
self._name = 'Unadjusted Covariance (Homoskedastic)'
def __init__(self, A, B, offset, scale, px_offset, px_scale, gsd=None, proj=None, default_z=0):
self.proj = proj
self._A = A
self._B = B
self._offset = offset
self._scale = scale
self._px_offset = px_offset
self._px_scale = px_scale
self._gsd = gsd
self._offscl = np.vstack([offset, scale])
self._offscl_rev = np.vstack([-offset/scale, 1.0/scale])
self._px_offscl_rev = np.vstack([px_offset, px_scale])
self._px_offscl = np.vstack([-px_offset/px_scale, 1.0/px_scale])
self._default_z = default_z
self._A_rev = np.dot(pinv(np.dot(np.transpose(A), A)), np.transpose(A))
# only using the numerator (more dynamic range for the fit?)
# self._B_rev = np.dot(pinv(np.dot(np.transpose(B), B)), np.transpose(B))
def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
def partial_corr(x, k):
"""
?k?????
w0 + w(k)*x(1) +w(k-1)*x(2) + ... + w(1)*x(k) = x(k+1)
"""
n = len(x)
X = [];Y=[]
for i in range(0,n-k-1):
X.append(x[i:i+k])
Y.append(x[i+k+1])
X = np.array(X); Y = np.array(Y)
one = np.ones((X.shape[0],1))
X = np.concatenate((one,X), axis=1)
coef = np.dot(pinv(X),Y)
# print 'coef=%s'%coef
return coef[1] # ????????w(k),??x(k+1)??k???w(k)*x(1)
def corr(s, k):
"""
?????????lag=k?
???
"""
n = len(s)
x = []; y = []
for i in range(0,n-k):
x.append([s[i]])
y.append([s[i+k]])
# least square by myself
x = np.array(x)
y = np.array(y)
one = np.ones((x.shape[0],1))
x = np.concatenate((one,np.array(x)),axis=1)
coefs = np.dot(pinv(x),y)
coef = coefs[1]
return coef
def make_bin_weights(self):
erb_max = hz2erb(self.sr/2.0)
ngrid = 1000
erb_grid = np.arange(ngrid) * erb_max / (ngrid - 1)
hz_grid = (np.exp(erb_grid / 9.26) - 1) / 0.00437
resp = np.zeros((ngrid, self.n_bins))
for b in range(self.n_bins):
w = self.widths[b]
r = (2.0 * w + 1.0) / self.sr * (hz_grid - self.hz_freqs[b])
resp[:,b] = np.square(np.sinc(r)+ 0.5 * np.sinc(r + 1.0) + 0.5 * np.sinc(r - 1.0));
self.weights = np.dot(linalg.pinv(resp), np.ones((ngrid,1)))
def do(self, a, b):
a_ginv = linalg.pinv(a)
assert_almost_equal(dot(a, a_ginv), identity(asarray(a).shape[0]))
assert_(imply(isinstance(a, matrix), isinstance(a_ginv, matrix)))
def tdoa_to_position(time_diff, sensor_pos):
sensors = len(time_diff)
if len(time_diff) != len(sensor_pos):
raise Exception('Channel number mismatch.')
inhom_mat = np.mat(np.zeros([sensors - 1, 1]))
coeff_mat = np.mat(np.zeros([sensors - 1, 3]))
for i in range(1, sensors):
coeff_mat[i - 1, :] = diff(sensor_pos[i], sensor_pos[0])
inhom_mat[i - 1] = time_diff[i] * sound_speed
angle_vec = lin.pinv(coeff_mat) * inhom_mat
return normalize(angle_vec)
def main():
sensor_poses, dist_diff = make_data()
inhom_mat = np.mat(np.zeros([sensors - 2, 1]))
coeff_mat = np.mat(np.zeros([sensors - 2, 2]))
for i in range(2, sensors):
args = dist_diff[1], dist_diff[i], \
sensor_poses[0], sensor_poses[1], sensor_poses[i]
coeff_mat[i - 2, 0], coeff_mat[i - 2, 1] = coeff(*args)
inhom_mat[i - 2] = -inhom(*args)
x_sol = lin.pinv(coeff_mat) * inhom_mat
x, y = x_sol[0, 0], x_sol[1, 0]
print("The source is anticipated at ({0}, {1})".format(x, y))
def tdoa_to_position(time_diff, sensor_pos):
sensors = len(time_diff)
if len(time_diff) != len(sensor_pos):
raise Exception('Channel number mismatch.')
inhom_mat = np.mat(np.zeros([sensors - 1, 1]))
coeff_mat = np.mat(np.zeros([sensors - 1, 3]))
for i in range(1, sensors):
coeff_mat[i - 1, :] = diff(sensor_pos[i], sensor_pos[0])
inhom_mat[i - 1] = time_diff[i] * sound_speed
angle_vec = lin.pinv(coeff_mat) * inhom_mat
return normalize(angle_vec)
def ridge_regression(data, labels, mu=0.0):
r"""Implementation of the Regularized Least Squares solver.
It solves the ridge regression problem with parameter ``mu`` on the
`l2-norm`.
Parameters
----------
data : (N, P) ndarray
Data matrix.
labels : (N,) or (N, 1) ndarray
Labels vector.
mu : float, optional (default is `0.0`)
`l2-norm` penalty.
Returns
--------
beta : (P, 1) ndarray
Ridge regression solution.
Examples
--------
>>> X = numpy.array([[0.1, 1.1, 0.3], [0.2, 1.2, 1.6], [0.3, 1.3, -0.6]])
>>> beta = numpy.array([0.1, 0.1, 0.0])
>>> Y = numpy.dot(X, beta)
>>> beta = l1l2py.algorithms.ridge_regression(X, Y, 1e3).T
>>> len(numpy.flatnonzero(beta))
3
"""
n, p = data.shape
if n < p:
tmp = np.dot(data, data.T)
if mu:
tmp += mu * n * np.eye(n)
tmp = la.pinv(tmp)
return np.dot(np.dot(data.T, tmp), labels.reshape(-1, 1))
else:
tmp = np.dot(data.T, data)
if mu:
tmp += mu * n * np.eye(p)
tmp = la.pinv(tmp)
return np.dot(tmp, np.dot(data.T, labels.reshape(-1, 1)))
def c_bind(X, Xpinv):
(n,p) = X.shape
# print X
# XT = X.T
XT = np.array(list(X.transpose())) ####AAAARGHHHHHHH MUST COPY THIS WAY!!!
# print XT
# return
# print("cuda.pinv(X) before:")
# print Xpinv.T
# print testlib
# testlib.pm(m.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
# ctypes.c_int(r), ctypes.c_int(c))
# pinv_bridge(float * h_X, float * h_Xpinv, int n, int p)
testlib.pinv_bridge(
XT.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
Xpinv.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
ctypes.c_int(n),
ctypes.c_int(p)
)
# print("cuda.pinv(X):")
# print Xpinv.T
return Xpinv.T
def test_ridge_regression_py(X, Y, _lambda):
X = X.astype(np.float32)
Y = Y.astype(np.float32)
t1 = time.time()
n, p = X.shape
if n < p:
tmp = np.dot(X, X.T)
if _lambda:
tmp += _lambda*n*np.eye(n)
tmp = la.pinv(tmp)
beta_out = np.dot(np.dot(X.T, tmp), Y.reshape(-1, 1))
else:
tmp = np.dot(X.T, X)
if _lambda:
tmp += _lambda*n*np.eye(p)
tmp = la.pinv(tmp)
beta_out = np.dot(tmp, np.dot(X.T, Y.reshape(-1, 1)))
t2 = time.time()
dt = t2-t1
print("total time (Python): {}".format(dt))
print(beta_out[:20,0])
def lsqfit(points,M):
M_ = linalg.pinv(M)
return M_ * points
def do(self, a, b):
a_ginv = linalg.pinv(a)
assert_almost_equal(dot(a, a_ginv), identity(asarray(a).shape[0]))
assert_(imply(isinstance(a, matrix), isinstance(a_ginv, matrix)))
def test_byteorder_check():
# Byte order check should pass for native order
if sys.byteorder == 'little':
native = '<'
else:
native = '>'
for dtt in (np.float32, np.float64):
arr = np.eye(4, dtype=dtt)
n_arr = arr.newbyteorder(native)
sw_arr = arr.newbyteorder('S').byteswap()
assert_equal(arr.dtype.byteorder, '=')
for routine in (linalg.inv, linalg.det, linalg.pinv):
# Normal call
res = routine(arr)
# Native but not '='
assert_array_equal(res, routine(n_arr))
# Swapped
assert_array_equal(res, routine(sw_arr))
def linear_discriminant_func(self, x, k):
"""
linear discriminant function.
Define by (4.10)
:return: delta_k(x)
"""
mu_k = self.Mu[k]
pi_k = self.Pi[k]
sigma_inv = self.math.pinv(self.Sigma_hat)
result = mu_k @ sigma_inv @ x.T - (mu_k @ sigma_inv @ mu_k.T)/2 + log(pi_k)
return result
def quadratic_discriminant_func(self, x, k):
mu_k = self.Mu[k]
pi_k = self.Pi[k]
sigma_k = self.Sigma_hat[k]
pinv = self.math.pinv
# assume that each row of x contain observation
result = -(np.log(np.linalg.det(sigma_k)))/2 \
- ((x - mu_k) @ pinv(sigma_k, rcond=0) @ (x - mu_k).T)/2 + log(pi_k)
return result
def train(self):
super().train()
sigma = self.Sigma_hat
D_, U = LA.eigh(sigma)
D = np.diagflat(D_)
self.A = np.power(LA.pinv(D), 0.5) @ U.T
def train(self):
super().train()
W = self.Sigma_hat
# prior probabilities (K, 1)
Pi = self.Pi
# class centroids (K, p)
Mu = self.Mu
p = self.p
# the number of class
K = self.n_class
# the dimension you want
L = self.L
# Mu is (K, p) matrix, Pi is (K, 1)
mu = np.sum(Pi * Mu, axis=0)
B = np.zeros((p, p))
for k in range(K):
# vector @ vector equal scalar, use vector[:, None] to transform to matrix
# vec[:, None] equal to vec.reshape((1, vec.shape[0]))
B = B + Pi[k]*((Mu[k] - mu)[:, None] @ ((Mu[k] - mu)[None, :]))
# Be careful, the `eigh` method get the eigenvalues in ascending , which is opposite to R.
Dw, Uw = LA.eigh(W)
# reverse the Dw_ and Uw
Dw = Dw[::-1]
Uw = np.fliplr(Uw)
W_half = self.math.pinv(np.diagflat(Dw**0.5) @ Uw.T)
B_star = W_half.T @ B @ W_half
D_, V = LA.eigh(B_star)
# reverse V
V = np.fliplr(V)
# overwrite `self.A` so that we can reuse `predict` method define in parent class
self.A = np.zeros((L, p))
for l in range(L):
self.A[l, :] = W_half @ V[:, l]
def __init__(self):
self.inv = linalg.inv
self.sum = np.sum
self.svd = svd
self.pinv = linalg.pinv