def test_events():
# use bouncing ball to test events work
# simulate in block diagram
int_opts = block_diagram.DEFAULT_INTEGRATOR_OPTIONS.copy()
int_opts['rtol'] = 1E-12
int_opts['atol'] = 1E-15
int_opts['nsteps'] = 1000
int_opts['max_step'] = 2**-3
x = x1, x2 = Array(dynamicsymbols('x_1:3'))
mu, g = sp.symbols('mu g')
constants = {mu: 0.8, g: 9.81}
ic = np.r_[10, 15]
sys = SwitchedSystem(
x1, Array([0]),
state_equations=r_[x2, -g],
state_update_equation=r_[sp.Abs(x1), -mu*x2],
state=x,
constants_values=constants,
initial_condition=ic
)
bd = BlockDiagram(sys)
res = bd.simulate(5, integrator_options=int_opts)
# compute actual impact time
tvar = dynamicsymbols._t
impact_eq = (x2*tvar - g*tvar**2/2 + x1).subs(
{x1: ic[0], x2: ic[1], g: 9.81}
)
t_impact = sp.solve(impact_eq, tvar)[-1]
# make sure simulation actually changes velocity sign around impact
abs_diff_impact = np.abs(res.t - t_impact)
impact_idx = np.where(abs_diff_impact == np.min(abs_diff_impact))[0]
assert np.sign(res.x[impact_idx-1, 1]) != np.sign(res.x[impact_idx+1, 1])
python类solve()的实例源码
def create_markov_operator(self, base_operator, dt, method="CrankNicolson"):
""" Let u be the Markov operator and L be the base generator. The model dynamic
is given by
u ' = L \cdot u
Following the Method Of Line (MOL) discretization,
let u(n) = u(t_n) where t_n is the discretized time domain.
The following schemes can be used:
- Explicit Euler --> u(n + 1) = (I + L * dt) \cdot u(n)
- Implicit Euler --> (I - L * dt) \cdot u(n + 1) = u(n)
- Crank Nicolson --> (I - L * dt / 2) \cdot u(n + 1) = (I + L * dt / 2) \cdot u(n)
:param base_operator:
:param dt:
:param method:
:return:
"""
if method == "ExplicitEuler":
u = np.eye(self.d) + dt * base_operator
elif method == "ImplicitEuler":
u = linalg.solve(np.eye(self.d) - dt * base_operator, np.eye(self.d))
elif method == "CrankNicolson":
u = linalg.solve(np.eye(self.d) - .5 * dt * base_operator,
np.eye(self.d) + .5 * dt * base_operator)
else:
raise NotImplementedError("Unsupported SDE resolution method")
return u
def build_base_operator(self, t):
"""
:param t: Not used as mu and sigma are constant
:return:
"""
# Update drift and volatility
self.build_moment_vectors(t)
base_operator = np.zeros((self.d, self.d))
nabla = linalg.block_diag(*[self.build_gradient_matrix(x) for x in range(1, self.d - 1)])
moments = np.zeros(2 * (self.d - 2))
for i in range(0, self.d - 2):
moments[2 * i] = self.drift[i + 1]
moments[2 * i + 1] = self.volatility[i + 1]
generator_elements = linalg.solve(nabla, moments)
r_idx, c_idx = np.diag_indices_from(base_operator)
base_operator[r_idx[1:-1], c_idx[:-2]] = generator_elements[::2]
base_operator[r_idx[1:-1], c_idx[2:]] = generator_elements[1::2]
np.fill_diagonal(base_operator, -np.sum(base_operator, axis=1))
# -- Boundary Condition: Volatility Matching --
nabla_0 = self.grid[1] - self.grid[0]
base_operator[0, 0] = - 1. * self.volatility[0] / (nabla_0 * nabla_0)
base_operator[0, 1] = - base_operator[0, 0]
nabla_d = self.grid[self.d - 1] - self.grid[self.d - 2]
base_operator[self.d - 1, self.d - 1] = - 1. * self.volatility[self.d - 1] / (nabla_d * nabla_d)
base_operator[self.d - 1, self.d - 2] = - base_operator[self.d - 1, self.d - 1]
# ----------------------------------------------
self.sanity_check_base_operator(base_operator)
return base_operator
def demand_mass_balance_c(host_odemand, class_odemand, avail, host_recapture):
"""Solve Demand Mass Balance equation for class-level
Parameters
----------
host_odemand: int
Observerd host demand
class_odemand: int
Observed class demand
avail: dict
Availability of demand open during period considered
host_recapture: float
Estimated host level recapture
Returns
-------
tuple
Estimated demand, spill and recapture
"""
# if observed demand of a class is 0 demand mass balance can't
# estimate demand and spill alone without additioanl information
demand = spill = recapture = 0
if class_odemand:
recapture = host_recapture * class_odemand / host_odemand
# availability of demand closed during period considered
k = 1 - avail
A = np.array([[1, -1], [-k, 1]])
B = np.array([class_odemand - recapture, 0])
demand, spill = solve(A, B)
return demand, spill, recapture
def generate_lin_system_from_regression_problem(n, d, sigma, filepath=None):
(X, y, beta, e) = generate_lin_regression(n, d, sigma)
# lambda_ = 6. * sigma**2. / n
lambda_ = sigma**2. / (n * numpy.linalg.norm(beta) ** 2)
A = 1. / (d * n) * X.T.dot(X) + numpy.identity(d) * lambda_
b = 1. / (d * n) * X.T.dot(y)
x = linalg.solve(A, b)
cn = numpy.linalg.cond(A)
obj = objective(X, y, x, lambda_, n)
if filepath:
write_system(A, b, x, filepath)
return (A, b, X, y, lambda_, x, cn, obj)
def fit(self, X, y):
"""
Fits a Gaussian Process regressor
Parameters
----------
X: np.ndarray, shape=(nsamples, nfeatures)
Training instances to fit the GP.
y: np.ndarray, shape=(nsamples,)
Corresponding continuous target values to X.
"""
self.X = X
self.y = y
self.nsamples = self.X.shape[0]
if self.optimize:
grads = None
if self.usegrads:
grads = self._grad
self.optHyp(param_key=self.covfunc.parameters, param_bounds=self.covfunc.bounds, grads=grads)
self.K = self.covfunc.K(self.X, self.X)
self.L = cholesky(self.K).T
self.alpha = solve(self.L.T, solve(self.L, y - self.mprior))
self.logp = -.5 * np.dot(self.y, self.alpha) - np.sum(np.log(np.diag(self.L))) - self.nsamples / 2 * np.log(
2 * np.pi)
def param_grad(self, k_param):
"""
Returns gradient over hyperparameters. It is recommended to use `self._grad` instead.
Parameters
----------
k_param: dict
Dictionary with keys being hyperparameters and values their queried values.
Returns
-------
np.ndarray
Gradient corresponding to each hyperparameters. Order given by `k_param.keys()`
"""
k_param_key = list(k_param.keys())
covfunc = self.covfunc.__class__(**k_param)
n = self.X.shape[0]
K = covfunc.K(self.X, self.X)
L = cholesky(K).T
alpha = solve(L.T, solve(L, self.y))
inner = np.dot(np.atleast_2d(alpha).T, np.atleast_2d(alpha)) - np.linalg.inv(K)
grads = []
for param in k_param_key:
gradK = covfunc.gradK(self.X, self.X, param=param)
gradK = .5 * np.trace(np.dot(inner, gradK))
grads.append(gradK)
return np.array(grads)
def predict(self, Xstar, return_std=False):
"""
Returns mean and covariances for the posterior Gaussian Process.
Parameters
----------
Xstar: np.ndarray, shape=((nsamples, nfeatures))
Testing instances to predict.
return_std: bool
Whether to return the standard deviation of the posterior process. Otherwise,
it returns the whole covariance matrix of the posterior process.
Returns
-------
np.ndarray
Mean of the posterior process for testing instances.
np.ndarray
Covariance of the posterior process for testing instances.
"""
Xstar = np.atleast_2d(Xstar)
kstar = self.covfunc.K(self.X, Xstar).T
fmean = self.mprior + np.dot(kstar, self.alpha)
v = solve(self.L, kstar.T)
fcov = self.covfunc.K(Xstar, Xstar) - np.dot(v.T, v)
if return_std:
fcov = np.diag(fcov)
return fmean, fcov
def fit(self, Y):
"""
Generates the RBF coefficients to fit a set of given data values Y for centers self.centers
:param Y: A set of dependent data values corresponding to self.centers
:return: Void, sets the self.coefs values
"""
kernel_matrix = self.EvaluateCentersKernel()
self.coefs = sl.solve(kernel_matrix, Y, sym_pos=True)
def Aint(qL, qR, d):
""" Returns the Osher-Solomon jump matrix for A, in the dth direction
NB: eig function should be replaced with analytical function, if known
"""
ret = zeros(n, dtype=complex128)
?q = qR - qL
for i in range(N+1):
q = qL + nodes[i] * ?q
J = jacobian(q, d)
?, R = eig(J, overwrite_a=1, check_finite=0)
b = solve(R, ?q, check_finite=0)
ret += weights[i] * dot(R, abs(?)*b)
return ret.real
def additive_decomp(self):
"""
Return values for the martingale decomposition
- ? : unconditional mean difference in Y
- H : coefficient for the (linear) martingale component (kappa_a)
- g : coefficient for the stationary component g(x)
- Y_0 : it should be the function of X_0 (for now set it to 0.0)
"""
I = np.identity(self.nx)
A_res = la.solve(I - self.A, I)
g = self.D @ A_res
H = self.F + self.D @ A_res @ self.B
return self.?, H, g
def maximization_tv(self, LU, RU):
# ML re-estimation of the total subspace matrix or the factor loading
# matrix
for mix in range(self.nmix):
idx = np.arange(self.ndim) + mix * self.ndim
Lu = np.zeros((self.tv_dim, self.tv_dim))
Lu[self.itril] = LU[mix, :]
Lu += np.tril(Lu, -1).T
self.Tm[:, idx] = solve(Lu, RU[:, idx])
def fit(self, X, T):
Phi = self.nonlinear_transformation(X)
# MLE for w
self.w = linalg.solve(
Phi.T.dot(Phi) + np.eye(self.n_basis) * self.lamb, Phi.T.dot(T))
# MLE for beta
n_output = 1 if T.ndim < 2 else T.shape[1]
self.beta = n_output / np.mean(linalg.norm(T - Phi.dot(self.w))**2)
return self
def compute_b(G_lst, GtG_lst, beta_lst, Rc0, num_bands, a_ri):
"""
compute the uniform sinusoidal samples b from the updated annihilating
filter coeffiients.
:param GtG_lst: list of G^H G for different subbands
:param beta_lst: list of beta-s for different subbands
:param Rc0: right-dual matrix, here it is the convolution matrix associated with c
:param num_bands: number of bands
:param L: size of b: L by 1
:param a_ri: a 2D numpy array. each column corresponds to the measurements within a subband
:return:
"""
b_lst = []
a_Gb_lst = []
for loop in range(num_bands):
GtG_loop = GtG_lst[loop]
beta_loop = beta_lst[loop]
b_loop = beta_loop - \
linalg.solve(GtG_loop,
np.dot(Rc0.T,
linalg.solve(np.dot(Rc0, linalg.solve(GtG_loop, Rc0.T)),
np.dot(Rc0, beta_loop)))
)
b_lst.append(b_loop)
a_Gb_lst.append(a_ri[:, loop] - np.dot(G_lst[loop], b_loop))
return np.column_stack(b_lst), linalg.norm(np.concatenate(a_Gb_lst))
def compute_phi_posterior(self):
logZ_posterior = 0
for i in range(self.no_layers):
Dout_i = self.layer_sizes[i+1]
for d in range(Dout_i):
mud_val = self.mu[i][d]
Sud_val = self.Su[i][d]
(sign, logdet) = np.linalg.slogdet(Sud_val)
# print 'phi_poste: ', 0.5 * logdet, 0.5 * np.sum(mud_val * spalg.solve(Sud_val, mud_val.T))
logZ_posterior += 0.5 * logdet
logZ_posterior += 0.5 * np.sum(mud_val * spalg.solve(Sud_val, mud_val.T))
return logZ_posterior
def compute_phi_cavity(self):
phi_cavity = 0
for i in range(self.no_layers):
Dout_i = self.layer_sizes[i+1]
for d in range(Dout_i):
muhatd_val = self.muhat[i][d]
Suhatd_val = self.Suhat[i][d]
(sign, logdet) = np.linalg.slogdet(Suhatd_val)
phi_cavity += 0.5 * logdet
phi_cavity += 0.5 * np.sum(muhatd_val * spalg.solve(Suhatd_val, muhatd_val.T))
return phi_cavity
def getUpdate(W, x, y, lambd):
G = getG(W, x) # PxP
Gdamp = G + np.identity(P) * lambd
J = getJ(W, x) # NxP
grad = np.mean(J * np.tile(y-getz(W, x), (1, P)), 0)
upd = solve(Gdamp, grad) # P
return upd
def power_inverse(A, x, tol = 0.0001, N =5000):
'''
inverse power method
faster convergence rate
tol: tolerance
N: maximum number of iterations
'''
q = np.dot(x.T, np.dot(A, x))/np.dot(x.T, x)
p = -1
u0, u1 = 0, 0
for i in range(A.shape[1]):
p += 1
if la.norm(x, np.inf) == x[i]:
break
x = x / x[p]
for k in range(N):
try:
y = la.solve(A - q * np.eye(A.shape[0]), x)
except LinAlgError:
return q, x
else:
u = y[p]
p = -1
for i in range(len(y)):
p += 1
if la.norm(y, np.inf) == y[i]:
break
err = la.norm(x - (y/y[p]), np.inf)
x = y/y[p]
if err < tol:
u = 1/u + q
return u, x
def ls_fast_givens(A, b):
m = A.shape[0]
n = A.shape[1]
if rank(A) < n:
raise Exception('Rank deficient')
S = qr.qr_fast_givens(A)
M^T = np.dot(S, la.inv(A))
b = M^T.dot(b)
x_ls = la.solve(S[:n, :n], b[:n])
return x_ls
def mysolve(LU, b):
if isinstance(LU, SuperLU):
return LU.solve(b)
else:
return lu_solve(LU, b)
###############################################################################
# Solve via full system
###############################################################################