def test_equality_mat(self, intp_fixture):
""" Equality constraint: abar, bbar, cbar, D
"""
pc, pc_intp = intp_fixture
# number
for i in range(pc_intp.N):
ds = pc_intp.ss[i+1] - pc_intp.ss[i]
ai_new = np.hstack((
pc.abar[i],
pc.abar[i+1] + 2 * ds * pc.bbar[i+1]))
bi_new = np.hstack((pc.bbar[i], pc.bbar[i+1]))
ci_new = np.hstack((pc.cbar[i], pc.cbar[i+1]))
Di_new = block_diag(pc.D[i], pc.D[i+1])
li_new = np.hstack((pc.l[i], pc.l[i+1]))
hi_new = np.hstack((pc.h[i], pc.h[i+1]))
assert np.allclose(ai_new, pc_intp.abar[i])
assert np.allclose(bi_new, pc_intp.bbar[i])
assert np.allclose(ci_new, pc_intp.cbar[i])
assert np.allclose(Di_new, pc_intp.D[i], atol=1e-8)
assert np.allclose(li_new, pc_intp.l[i])
assert np.allclose(hi_new, pc_intp.h[i])
python类block_diag()的实例源码
def low_rank_align(X, Y, Cxy, d=None, mu=0.8):
"""Input: data matrices X,Y, correspondence matrix Cxy,
embedding dimension d, and correspondence weight mu
Output: embedded X and embedded Y
"""
nx, dx = X.shape
ny, dy = Y.shape
assert Cxy.shape==(nx,ny), \
'Correspondence matrix must be shape num_X_samples X num_Y_samples.'
C = np.fliplr(block_diag(np.fliplr(Cxy),np.fliplr(Cxy.T)))
if d is None:
d = min(dx,dy)
Rx = low_rank_repr(X,d)
Ry = low_rank_repr(Y,d)
R = block_diag(Rx,Ry)
tmp = np.eye(R.shape[0]) - R
M = tmp.T.dot(tmp)
L = laplacian(C)
eigen_prob = (1-mu)*M + 2*mu*L
_,F = eigh(eigen_prob,eigvals=(1,d),overwrite_a=True)
Xembed = F[:nx]
Yembed = F[nx:]
return Xembed, Yembed
def __init__(self, X, kern, Xm):
super(PITC, self).__init__("PITC")
M = np.shape(Xm)[0]
self.M = M
start = time.time()
X_split = np.array_split(X, M)
self.kern = kern
kern_blocks = np.zeros((M),dtype=object)
for t in xrange(M):
nyst = Nystrom(X_split[t], kern, Xm, False)
size = np.shape(X_split[t])[0]
kern_blocks[t] = kern.K(X_split[t], X_split[t]) - nyst.precon + (kern.noise)*np.identity(size)
self.blocks = kern_blocks
blocked = block_diag(*kern_blocks)
self.nyst = Nystrom(X, kern, Xm, False)
self.precon = self.nyst.precon + blocked
self.duration = time.time() - start
def __init__(self, X, kern, Xm):
super(PITC, self).__init__("PITC")
M = np.shape(Xm)[0]
self.M = M
start = time.time()
X_split = np.array_split(X, M)
self.kern = kern
kern_blocks = np.zeros((M),dtype=object)
for t in xrange(M):
nyst = Nystrom(X_split[t], kern, Xm, False)
size = np.shape(X_split[t])[0]
kern_blocks[t] = kern.K(X_split[t], X_split[t]) - nyst.precon + (kern.noise)*np.identity(size)
self.blocks = kern_blocks
blocked = block_diag(*kern_blocks)
self.nyst = Nystrom(X, kern, Xm, False)
self.precon = self.nyst.precon + blocked
self.duration = time.time() - start
test_minimized_constrained.py 文件源码
项目:ip-nonlinear-solver
作者: antonior92
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def constr(self):
def fun(x):
x_coord, y_coord, z_coord = self._get_cordinates(x)
return x_coord**2 + y_coord**2 + z_coord**2 - 1
def jac(x):
x_coord, y_coord, z_coord = self._get_cordinates(x)
Jx = 2 * np.diag(x_coord)
Jy = 2 * np.diag(y_coord)
Jz = 2 * np.diag(z_coord)
return csc_matrix(np.hstack((Jx, Jy, Jz)))
def hess(x, v):
D = 2 * np.diag(v)
return block_diag(D, D, D)
return NonlinearConstraint(fun, ("less",), jac, hess)
def mtx_fri2visi_ri_multiband(M, p_mic_x_all, p_mic_y_all, D1, D2, aslist=False):
"""
build the matrix that maps the Fourier series to the visibility in terms of
REAL-VALUED entries only. (matrix size double)
:param M: the Fourier series expansion is limited from -M to M
:param p_mic_x_all: a matrix that contains microphones x coordinates
:param p_mic_y_all: a matrix that contains microphones y coordinates
:param D1: expansion matrix for the real-part
:param D2: expansion matrix for the imaginary-part
:return:
"""
num_bands = p_mic_x_all.shape[1]
if aslist:
return [mtx_fri2visi_ri(M, p_mic_x_all[:, band_count],
p_mic_y_all[:, band_count], D1, D2)
for band_count in range(num_bands)]
else:
return linalg.block_diag(*[mtx_fri2visi_ri(M, p_mic_x_all[:, band_count],
p_mic_y_all[:, band_count], D1, D2)
for band_count in range(num_bands)])
def output_shrink(K, L):
"""
shrink the convolution output to half the size.
used when both the annihilating filter and the uniform samples of sinusoids satisfy
Hermitian symmetric.
:param K: the annihilating filter size: K + 1
:param L: length of the (complex-valued) b vector
:return:
"""
out_len = L - K
if out_len % 2 == 0:
half_out_len = np.int(out_len / 2.)
mtx_r = np.hstack((np.eye(half_out_len),
np.zeros((half_out_len, half_out_len))))
mtx_i = mtx_r
else:
half_out_len = np.int((out_len + 1) / 2.)
mtx_r = np.hstack((np.eye(half_out_len),
np.zeros((half_out_len, half_out_len - 1))))
mtx_i = np.hstack((np.eye(half_out_len - 1),
np.zeros((half_out_len - 1, half_out_len))))
return linalg.block_diag(mtx_r, mtx_i)
def __call__(self, report, parent=None):
"""Init new target from report."""
if parent is None:
model = models.ConstantVelocityModel(self.q)
x0 = np.array([report.z[0], report.z[1], 0.0, 0.0])
P0 = block_diag(report.R, self.pv)
return KFilter(model, x0, P0)
# elif parent.is_new():
# model = models.ConstantVelocityModel(self.q)
# x0 = np.array([report.z[0],
# report.z[1],
# (report.z[0] - parent.filter.x[0])/self.dT,
# (report.z[1] - parent.filter.x[1])/self.dT])
# P0 = block_diag(report.R, self.pv)
# return KFilter(model, x0, P0)
else:
return deepcopy(parent.filter)
def _time_update(self, time):
# in non-additive case, augment mean and covariance
mean = self.x_mean_filt if self.sys.q_additive else np.hstack((self.x_mean_filt, self.q_mean))
cov = self.x_cov_filt if self.sys.q_additive else block_diag(self.x_cov_filt, self.q_cov)
assert mean.ndim == 1 and cov.ndim == 2
# apply moment transform to compute predicted state mean, covariance
self.x_mean_pred, self.x_cov_pred, self.xx_cov = self.transf_dyn.apply(self.sys.dyn_eval, mean, cov,
self.sys.par_fcn(time))
if self.sys.q_additive: self.x_cov_pred += self.q_cov
# in non-additive case, augment mean and covariance
mean = self.x_mean_pred if self.sys.r_additive else np.hstack((self.x_mean_pred, self.r_mean))
cov = self.x_cov_pred if self.sys.r_additive else block_diag(self.x_cov_pred, self.r_cov)
assert mean.ndim == 1 and cov.ndim == 2
# apply moment transform to compute measurement mean, covariance
self.z_mean_pred, self.z_cov_pred, self.xz_cov = self.transf_meas.apply(self.sys.meas_eval, mean, cov,
self.sys.par_fcn(time))
# in additive case, noise covariances need to be added
if self.sys.r_additive: self.z_cov_pred += self.r_cov
# in non-additive case, cross-covariances must be trimmed (has no effect in additive case)
self.xz_cov = self.xz_cov[:, :self.sys.xD]
self.xx_cov = self.xx_cov[:, :self.sys.xD]
def test_block_diag_simple(rgen):
rows = (4, 7)
cols = (3, 6)
summands = [factory._zrandn((rows[i], cols[i]), randstate=rgen)
for i in range(len(rows))]
blockdiag_sum = utils.block_diag(summands)
blockdiag_sum_scipy = block_diag(*summands)
assert_array_almost_equal(blockdiag_sum, blockdiag_sum_scipy)
def test_block_diag(rgen):
leftmargin = 3
rightmargin = 5
rows = (4, 7)
cols = (3, 6)
nr_blocks = len(rows)
nr_summands = 3
leftvecs = factory._zrandn((nr_blocks, nr_summands, leftmargin),
randstate=rgen)
middlematrices = [factory._zrandn((nr_summands, rows[i], cols[i]),
randstate=rgen)
for i in range(nr_blocks)]
rightvecs = factory._zrandn((nr_blocks, nr_summands, rightmargin),
randstate=rgen)
blockdiag_summands = []
for i in range(nr_blocks):
summand = np.zeros(
(leftmargin, rows[i], cols[i], rightmargin), dtype=complex)
for j in range(nr_summands):
summand += np.outer(
np.outer(leftvecs[i, j, :], middlematrices[i][j, :, :]),
rightvecs[i, j, :]).reshape(summand.shape)
blockdiag_summands.append(summand)
blockdiag_sum = utils.block_diag(blockdiag_summands, axes=(1, 2))
blockdiag_sum_explicit = np.zeros(
(leftmargin, sum(rows), sum(cols), rightmargin), dtype=complex)
for i in range(nr_blocks):
for j in range(nr_summands):
summands = [middlematrices[i2][j]
if i2 == i else np.zeros_like(middlematrices[i2][j])
for i2 in range(nr_blocks)]
middle = block_diag(*summands)
blockdiag_sum_explicit += \
np.outer(np.outer(leftvecs[i][j], middle), rightvecs[i][j]) \
.reshape(blockdiag_sum_explicit.shape)
assert_array_almost_equal(blockdiag_sum, blockdiag_sum_explicit)
def test_matrices_A_after_func_fill(self, qpOASES_mat_fixtures):
""" Verify qpOASES matrices after filling.
"""
pcs, pp = qpOASES_mat_fixtures
random_fill([pp.A])
pp._fill_matrices()
# A
# operational rows
for i in range(pp.N+1):
assert np.allclose(pp.A[i, :pp.nop, :], np.zeros((pp.nop, pp.nV)))
# Assert canonical part if there are canonical constraints
a_expected = np.hstack(map(lambda pc: pc.a[i], pcs))
b_expected = np.hstack(map(lambda pc: pc.b[i], pcs))
assert np.allclose(pp.A[i, pp.nop: pp.nop + pp.nm, 0], a_expected)
assert np.allclose(pp.A[i, pp.nop: pp.nop + pp.nm, 1], b_expected)
assert np.allclose(pp.A[i, pp.nop: pp.nop + pp.nm, 2:],
np.zeros((pp.nm, pp.nv)))
a_expected = np.hstack(map(lambda pc: pc.abar[i], pcs))
assert np.allclose(
pp.A[i, pp.nop + pp.nm: pp.nop + pp.nm + pp.neq, 0],
a_expected)
b_expected = np.hstack(map(lambda pc: pc.bbar[i], pcs))
assert np.allclose(
pp.A[i, pp.nop + pp.nm: pp.nop + pp.nm + pp.neq, 1],
b_expected)
D_expected = block_diag(*map(lambda pc: - pc.D[i], pcs))
assert np.allclose(
pp.A[i, pp.nop + pp.nm: pp.nop + pp.nm + pp.neq, 2:],
D_expected)
G_expected = block_diag(*map(lambda pc: pc.G[i], pcs))
assert np.allclose(
pp.A[i, pp.nop + pp.nm + pp.neq:
pp.nop + pp.nm + pp.neq + pp.niq, 2:], G_expected)
def predict(self, r, rnext, wf0, Cf, dt):
"""
r: Desired reference state at time t
rnext: Desired reference state at time t+dt
wf0: Mean of the process noise
Cf: Covariance of the process noise
dt: Timestep to predict forward
Progresses the state estimate one dt into the future.
Returns the control effort u.
"""
# Compute control action
self.u = self.g(r, rnext, self.x, self.Cx, dt)
# Store relevant parameters
utp = self.utpDict['_f']
# Compute sigma points, propagate through process, and store tangent-space representation
M = spl.cholesky(utp[0]*spl.block_diag(self.Cx, Cf))
s0 = np.append(self.x, wf0)
fS = [self.sf(s0, dt)]
fT_sum = np.zeros(self.n_m)
for Vi in np.vstack((M, -M)):
fS.append(self.sf(self.sxplus(s0, Vi), dt))
fT_sum += self.xminus(fS[-1], fS[0])
# Determine the mean of the propagated sigma points from the tangent vectors
self.x = self.xplus(fS[0], utp[2]*fT_sum)
# Determine the covariance from the tangent-space deviations from the mean
fv0 = self.xminus(fS[0], self.x)
fPi_sum = np.zeros((self.n_m, self.n_m))
for fSi in fS[1:]:
fv = self.xminus(fSi, self.x)
fPi_sum += np.outer(fv, fv)
self.Cx = utp[3]*np.outer(fv0, fv0) + utp[2]*fPi_sum
# Over and out
return np.copy(self.u)
def correct(self, sensor, z, wh0, Ch):
"""
sensor: String of the sensor key name
z: Value of the measurement
wh0: Mean of that sensor's noise
Ch: Covariance of that sensor's noise
Updates the state estimate with the given sensor measurement.
"""
# Store relevant functions and parameters
h, zminus, n_z, _, utp = self.hDict_full[sensor]
# Compute sigma points and emulate their measurement error
M = spl.cholesky(utp[0]*spl.block_diag(self.Cx, Ch))
V = np.vstack((M, -M))
s0 = np.append(self.x, wh0)
hE = [zminus(z, self.sh(s0, h))]
for i, Vi in enumerate(V):
hE.append(zminus(z, self.sh(self.sxplus(s0, Vi), h)))
hE = np.array(hE)
# Determine the mean of the sigma measurement errors
e0 = utp[1]*hE[0] + utp[2]*np.sum(hE[1:], axis=0)
# Determine the covariance and cross-covariance
hV = hE - e0
hPi_sum = np.zeros((n_z, n_z))
hPci_sum = np.zeros((self.n_m, n_z))
for Vqi, hVi in zip(V[:, :self.n_m], hV[1:]):
hPi_sum += np.outer(hVi, hVi)
hPci_sum += np.outer(Vqi, hVi)
Pzz = utp[3]*np.outer(hV[0], hV[0]) + utp[2]*hPi_sum
Pxz = utp[2]*hPci_sum
# Kalman update the state estimate and covariance
K = Pxz.dot(npl.inv(Pzz))
self.x = self.xplus(self.x, -K.dot(e0))
self.Cx = self.Cx - K.dot(Pzz).dot(K.T)
def compute_control(self):
"""
Compute the optimal control p=[p[0] ... p[K]].
"""
K, Phi, Psi, X_init = self.K, self.Phi, self.Psi, self.X_init
# Cost 1: sum_i 1/K * (x_i - p_i) ** 2
B0 = array([[1, 0], [0, 0]])
C0 = array([1, 0])
B = block_diag(*[B0 for i in xrange(K)])
C = block_diag(*[C0 for i in xrange(K)])
P1 = 1. / K * (dot(Psi.T, dot(B, Psi)) - 2 * dot(C, Psi) + eye(K))
q1 = 1. / K * dot(dot(Phi, X_init).T, dot(B, Psi) - C.T)
# Cost 2: sum_i (p_i - p_{i - 1}) ** 2
N1, N2 = zeros((K, K)), zeros((K, K))
N1[0:K - 1, 0:K - 1] = eye(K - 1)
N2[0:K - 1, 1:K] = eye(K - 1)
P2 = dot((N2 - N1).T, (N2 - N1))
q2 = zeros(K)
w1, w2 = 1., 100.
P = w1 * P1 + w2 * P2
q = w1 * q1 + w2 * q2
G = vstack([self.I, -self.I])
h = hstack([self.p_max, -self.p_min])
A = vstack([self.Psi_last, hstack([zeros(K - 1), [1]])])
b = hstack([self.X_target, [self.X_target[0]]]) \
- hstack([dot(self.Phi_last, X_init), [0]])
p = cvxopt_solve_qp(P, q, G, h, A, b)
return p
def build_base_operator(self, t):
"""
:param t: Not used as mu and sigma are constant
:return:
"""
# Update drift and volatility
self.build_moment_vectors(t)
base_operator = np.zeros((self.d, self.d))
nabla = linalg.block_diag(*[self.build_gradient_matrix(x) for x in range(1, self.d - 1)])
moments = np.zeros(2 * (self.d - 2))
for i in range(0, self.d - 2):
moments[2 * i] = self.drift[i + 1]
moments[2 * i + 1] = self.volatility[i + 1]
generator_elements = linalg.solve(nabla, moments)
r_idx, c_idx = np.diag_indices_from(base_operator)
base_operator[r_idx[1:-1], c_idx[:-2]] = generator_elements[::2]
base_operator[r_idx[1:-1], c_idx[2:]] = generator_elements[1::2]
np.fill_diagonal(base_operator, -np.sum(base_operator, axis=1))
# -- Boundary Condition: Volatility Matching --
nabla_0 = self.grid[1] - self.grid[0]
base_operator[0, 0] = - 1. * self.volatility[0] / (nabla_0 * nabla_0)
base_operator[0, 1] = - base_operator[0, 0]
nabla_d = self.grid[self.d - 1] - self.grid[self.d - 2]
base_operator[self.d - 1, self.d - 1] = - 1. * self.volatility[self.d - 1] / (nabla_d * nabla_d)
base_operator[self.d - 1, self.d - 2] = - base_operator[self.d - 1, self.d - 1]
# ----------------------------------------------
self.sanity_check_base_operator(base_operator)
return base_operator
def combineTransMats(mats_trans,states_pho):
'''
combine multi transition matrix into one matrix
:param mats_trans:
:param states_pho:
:return:
'''
mat_trans_comb = block_diag(*mats_trans)
state_pho_comb = sum(states_pho, [])
index_start = [0]
index_end = [mats_trans[0].shape[0]-1]
for ii in range(1,len(mats_trans)):
index_start.append(index_end[-1]+1)
index_end.append(index_end[-1]+mats_trans[ii].shape[0])
return mat_trans_comb,state_pho_comb,index_start,index_end
def combineTransMats(mats_trans,states_pho):
'''
combine multi transition matrix into one matrix
:param mats_trans:
:param states_pho:
:return:
'''
mat_trans_comb = block_diag(*mats_trans)
state_pho_comb = sum(states_pho, [])
index_start = [0]
index_end = [mats_trans[0].shape[0]-1]
for ii in range(1,len(mats_trans)):
index_start.append(index_end[-1]+1)
index_end.append(index_end[-1]+mats_trans[ii].shape[0])
return mat_trans_comb,state_pho_comb,index_start,index_end
def _make_sigma(self, TNTs, phiinv):
return sps.block_diag(TNTs,'csc') + sps.csc_matrix(phiinv)
CC_orth.py 文件源码
项目:Differential-Evolution-with-PCA-based-Crossover
作者: zhudazheng
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def __init__(self, n):
self.n = n
self.m = n/5
M = np.random.rand(self.m, self.m)
M = linalg.qr(M)[0]
A = linalg.block_diag(M,M,M,M,M)
# A = linalg.block_diag(M,M,M,M,M,M,M,M,M,M, M,M,M,M,M,M,M,M,M,M)
self.A = A
orth.py 文件源码
项目:Differential-Evolution-with-PCA-based-Crossover
作者: zhudazheng
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def __init__(self):
M = np.random.rand(5,5)
M = linalg.qr(M)[0]
A = linalg.block_diag(M,M,M,M)
A = linalg.block_diag(M,M,M,M,M,M,M,M,M,M, M,M,M,M,M,M,M,M,M,M)
self.A = A
def run(self, sequence):
"""
Build the full-oligomer stiffness matrix by positioning the 6x6
stiffness matrices for each tetranucleotide (obtained by calling
DNAFlexibility's _get_flexibilities()) on the diagonal.
"""
stiffness_matrix = block_diag(*list(self._get_flexibilities(sequence)))
return stiffness_matrix
#------------------------------------------------------------------------------
def __call__(self, tracker, report):
"""Init new target from report."""
r = min(report.rB * tracker.params.lambdaB(report),
tracker.params.rBmax, 1.0)
print("New target:", r, "lambdaB", tracker.params.lambdaB(report))
if r < 1e-6:
return None
id_ = tracker.new_id()
model = models.ConstantVelocityModel(self.q, self.pS)
x0 = np.array([report.z[0], report.z[1], 0.0, 0.0])
P0 = block_diag(report.R, self.pv)
pdf = PF.from_gaussian(x0, P0, tracker.params.N_max)
return Target(id_, model, r, pdf)
def get_block_inversion(self):
diag_blocks = self.blocks
inverted_blocks = np.zeros(len(diag_blocks), dtype=object)
for i in xrange(len(diag_blocks)):
inverted_blocks[i] = np.linalg.inv(diag_blocks[i])
return block_diag(*inverted_blocks)
def get_laplace_block_inversion(self, Wsqrt):
diag_blocks = self.blocks
Wsqrt_split = np.array_split(Wsqrt, self.M)
inverted_blocks = np.zeros(len(diag_blocks), dtype=object)
for i in xrange(len(diag_blocks)):
Wblock = np.diag(Wsqrt_split[i].flatten())
block = np.dot(Wblock, np.dot(diag_blocks[i], Wblock))
inverted_blocks[i] = np.linalg.inv(block + np.identity(len(Wblock)))
return block_diag(*inverted_blocks)
def __init__(self, X, kern, M):
super(BlockJacobi, self).__init__("BlockJacobi")
self.M = M
start = time.time()
X_split = np.array_split(X, M)
kern_blocks = np.zeros((M),dtype=object)
for t in xrange(M):
size = np.shape(X_split[t])[0]
kern_blocks[t] = kern.K(X_split[t], X_split[t]) + kern.noise*np.identity(size)
self.duration = time.time()-start
self.blocks = kern_blocks
self.precon = block_diag(*kern_blocks)
def get_inversion(self):
diag_blocks = self.blocks
inverted_blocks = np.zeros(len(diag_blocks), dtype=object)
for i in xrange(len(diag_blocks)):
inverted_blocks[i] = np.linalg.inv(diag_blocks[i])
inverted_diag = block_diag(*inverted_blocks)
return inverted_diag
def get_block_inversion(self):
diag_blocks = self.blocks
inverted_blocks = np.zeros(len(diag_blocks), dtype=object)
for i in xrange(len(diag_blocks)):
inverted_blocks[i] = np.linalg.inv(diag_blocks[i])
return block_diag(*inverted_blocks)
def get_laplace_block_inversion(self, Wsqrt):
diag_blocks = self.blocks
Wsqrt_split = np.array_split(Wsqrt, self.M)
inverted_blocks = np.zeros(len(diag_blocks), dtype=object)
for i in xrange(len(diag_blocks)):
Wblock = np.diag(Wsqrt_split[i].flatten())
block = np.dot(Wblock, np.dot(diag_blocks[i], Wblock))
inverted_blocks[i] = np.linalg.inv(block + np.identity(len(Wblock)))
return block_diag(*inverted_blocks)
def __init__(self, X, kern, M):
super(BlockJacobi, self).__init__("BlockJacobi")
self.M = M
start = time.time()
X_split = np.array_split(X, M)
kern_blocks = np.zeros((M),dtype=object)
for t in xrange(M):
size = np.shape(X_split[t])[0]
kern_blocks[t] = kern.K(X_split[t], X_split[t]) + kern.noise*np.identity(size)
self.duration = time.time()-start
self.blocks = kern_blocks
self.precon = block_diag(*kern_blocks)