def __init__(self, settings):
settings.update(output_dim=4)
super().__init__(settings)
self.a_mat, self.b_mat, self.c_mat = linearise_system(
self._settings["steady state"],
self._settings["steady tau"])
self.L = pm.place_siso(self.a_mat.T,
self.c_mat.T,
self._settings["poles"]).T
self.step_width = self._settings["tick divider"] * self._settings["step width"]
self.output = np.array(self._settings["initial state"], dtype=float)
self.solver = ode(self.linear_state_function)
self.solver.set_integrator("dopri5",
method=self._settings["Method"],
rtol=self._settings["rTol"],
atol=self._settings["aTol"])
self.solver.set_initial_value(self._settings["initial state"])
self.nextObserver_output = self.output
python类ode()的实例源码
def __init__(self, settings):
Solver.__init__(self, settings)
# setup solver
if hasattr(self._model, "jacobian"):
self._solver = ode(self._model.state_function,
jac=self._model.jacobian)
else:
self._solver = ode(self._model.state_function)
self._solver.set_integrator(self._settings["Mode"],
method=self._settings["Method"],
rtol=self._settings["rTol"],
atol=self._settings["aTol"],
max_step=self._settings["step size"]
)
self._solver.set_initial_value(np.atleast_1d(self._model.initial_state),
t=self._settings["start time"])
def odeint(fun, t, y0, integrator=0, method=0, rtol=0, atol=1e-12):
s = ode(fun)
integrators = ['vode', 'zvode', 'lsoda', 'dopri5', 'dop853']
methods = ['adams', 'bdf']
s.set_integrator(integrators[0],
method=methods[0],
order=10,
rtol=rtol,
atol=atol,
with_jacobian=False)
t0 = t[0]
dt = t[1]-t0
y = [y0]
s.set_initial_value(y0, t0)
while s.successful() and s.t < t[-1]:
s.integrate(s.t+dt)
y.append(s.y)
y = array(y)
return y
def implicit_black_box(propensities,V,X,w,h,deter_vector,stoc_positions, positions, valid,deriv):
from scipy.integrate import ode
#pdb.set_trace()
deter_ode = ode(f).set_integrator('lsoda',method='bdf', with_jacobian=False)
deter_ode.set_initial_value(X[deter_vector,:].flatten(), 0).set_f_params([propensities,V,X,deter_vector,stoc_positions, positions, valid,w])
#pdb.set_trace()
while deter_ode.successful() and deter_ode.t < h:
deter_ode.integrate(h)
#print("Black Box: \n"+ str(deter_ode.y))
#print("iterator : \n:"+str(next_X[deter_vector,:]))
X[deter_vector,:] = deter_ode.y.reshape(( np.sum(deter_vector),X.shape[1]))
return X
def perform_action(self, action, perturb_params=False, p_lambda1=0, p_lambda2=0, p_k1=0, \
p_k2=0, p_f=0, p_m1=0, p_m2=0, p_lambdaE=0, p_bE=0, p_Kb=0, p_d_E=0, p_Kd=0, **kw):
"""Perform the specifed action and upate the environment.
Arguments:
action -- action to be taken
Keyword Arguments:
perturb_params -- boolean indicating whether to perturb dynamics (default: False)
p_lambda1 -- hidden parameter (default: 0)
p_lambda2 -- hidden parameter (default: 0)
p_k1 -- hidden parameter (default: 0)
p_k2 -- hidden parameter (default: 0)
p_f -- hidden parameter (default: 0)
p_m1 -- hidden parameter (default: 0)
p_m2 -- hidden parameter (default: 0)
p_lambdaE -- hidden parameter (default: 0)
p_bE -- hidden parameter (default: 0)
p_Kb -- hidden parameter (default: 0)
p_d_E -- hidden parameter (default: 0)
p_Kd -- hidden parameter (default: 0)
"""
self.t += 1
self.action = action
eps1, eps2 = self.eps_values_for_actions[action]
r = ode(self.model_derivatives).set_integrator('vode',nsteps=10000,method='bdf')
t0 = 0
deriv_args = (eps1, eps2, perturb_params, p_lambda1, p_lambda2, p_k1, p_k2, p_f, p_m1, p_m2, p_lambdaE, p_bE, p_Kb, p_d_E, p_Kd)
r.set_initial_value(self.state, t0).set_f_params(deriv_args)
self.state = r.integrate(self.dt)
reward = self.calc_reward(action=action)
return reward, self.observe()
def line(date_dt, pos0, dt=1e2):
"""
???
https://www.mathworks.com/matlabcentral/fileexchange/34388-international-geomagnetic-reference-field--igrf--model/content/igrfline.m
"""
r_km0 = pos0.radius / 1e3
theta0 = math.radians(pos0.geocentricLatitude)
phi0 = math.radians(pos0.longitude)
y0, t0 = [r_km0, theta0, phi0], 0
tracer = ode(differential).set_integrator('dopri5')
tracer.set_initial_value(y0, t0)
pos_path = [PyPosition(math.degrees(theta0),
math.degrees(phi0),
r_km0 * 1e3,
PyPosition.CoordinateSystem['geocentric'])]
while True:
assert tracer.successful()
r_km_i, theta_i, phi_i = tracer.integrate(tracer.t + dt)
pos = PyPosition(math.degrees(theta_i),
math.degrees(phi_i),
r_km_i * 1e3,
PyPosition.CoordinateSystem['geocentric'])
#print(pos.height)
if pos.height < 0:
break
pos_path.append(pos)
return pos_path
def create_solver(dydt):
solver = ode(dydt).set_integrator('lsoda', method='bdf', rtol=1e-2)
return solver
def __init__(self, model, jac=False,
integrator='vode', **integrator_kwargs):
def func_scipy_proxy(t, U, fields, pars, hook):
fields.fill(U)
fields, pars = hook(t, fields, pars)
return model.F(fields, pars)
def jacob_scipy_proxy(t, U, fields, pars, hook):
fields.fill(U)
fields, pars = hook(t, fields, pars)
return model.J(fields, pars, sparse=False)
self._solv = ode(func_scipy_proxy,
jac=jacob_scipy_proxy if jac else None)
self._solv.set_integrator(integrator, **integrator_kwargs)
def implicit_black_box(propensities,V,X,w,h,deter_vector,stoc_positions, positions, valid,deriv):
# Adjustment for systems reaching steady state
temp = derivative_G(propensities,V,X,w,deter_vector,stoc_positions,positions,valid)
#pdb.set_trace()
valid_adjust_pos = np.where(np.sum(np.abs(temp),axis=0) < 1e-10,True,False)
valid_adjust = valid[:,:]
valid_adjust[valid_adjust_pos,:] = False
#print(" Reached Steady State %d"%(np.sum(valid_adjust_pos)))
from scipy.integrate import ode
#pdb.set_trace()
deter_ode = ode(f).set_integrator('lsoda',method='adams', with_jacobian=False)
deter_ode.set_initial_value(X[deter_vector,:].flatten(), 0).set_f_params([propensities,V,X,deter_vector,stoc_positions, positions, valid_adjust,w])
#pdb.set_trace()
while deter_ode.successful() and deter_ode.t < h:
deter_ode.integrate(h)
#print("Black Box: \n"+ str(deter_ode.y))
#print("iterator : \n:"+str(next_X[deter_vector,:]))
X[deter_vector,:] = deter_ode.y.reshape(( np.sum(deter_vector),X.shape[1]))
## Another adjust to compensate for non negative
X = np.where(X < 0.0,0.0,X)
return X
def implicit_black_box(propensities,V,X,w,h,deter_vector,stoc_positions, positions, valid,jac):
# Adjustment for systems reaching steady state
"""
temp = derivative_G(propensities,V,X,w,deter_vector,stoc_positions,positions,valid,jac)
#pdb.set_trace()
valid_adjust_pos = np.where(np.sum(np.abs(temp),axis=0) < 1e-10,True,False)
valid_adjust = valid[:,:]
valid_adjust[valid_adjust_pos,:] = False
print(" Reached Steady State %d"%(np.sum(valid_adjust_pos)))
"""
from scipy.integrate import ode
#pdb.set_trace()
deter_ode = ode(f).set_integrator('vode',method='bdf', with_jacobian=False)
deter_ode.set_initial_value(X[deter_vector,:].flatten(), 0)
#deter_ode.set_f_params([propensities,V,X,deter_vector,stoc_positions, positions,valid_adjust,w,jac])
deter_ode.set_f_params([propensities,V,X,deter_vector,stoc_positions, positions,valid,w,jac])
#pdb.set_trace()
while deter_ode.successful() and deter_ode.t < h:
deter_ode.integrate(h)
#print("Black Box: \n"+ str(deter_ode.y))
#print("iterator : \n:"+str(next_X[deter_vector,:]))
X[deter_vector,:] = deter_ode.y.reshape(( np.sum(deter_vector),X.shape[1]))
# Another adjust to compensate for non negative
X = np.where(X < 0.0,0.0,X)
return X
def setup(self):
self.integrator = ode(self.constitutive_relation)
self.integrator.set_integrator("lsoda")
# Main integrator function. Output vector:
# 0: friction, 1+: state parameters
def _simulate_with_scipy(self, initcon, tsim, switchpts,
varying_inputs, integrator,
integrator_options):
scipyint = \
scipy.ode(self._rhsfun).set_integrator(integrator,
**integrator_options)
scipyint.set_initial_value(initcon, tsim[0])
profile = np.array(initcon)
i = 1
while scipyint.successful() and scipyint.t < tsim[-1]:
# check if tsim[i-1] is a switching time and update value
if tsim[i - 1] in switchpts:
for v in self._siminputvars.keys():
if tsim[i - 1] in varying_inputs[v]:
p = self._templatemap[self._siminputvars[v]]
p.set_value(varying_inputs[v][tsim[i - 1]])
profilestep = scipyint.integrate(tsim[i])
profile = np.vstack([profile, profilestep])
i += 1
if not scipyint.successful():
raise DAE_Error("The Scipy integrator %s did not terminate "
"successfully." % integrator)
return [tsim, profile]
def _simulate_with_casadi_no_inputs(self, initcon, tsim, integrator,
integrator_options):
# Old way (10 times faster, but can't incorporate time
# varying parameters/controls)
xalltemp = [self._templatemap[i] for i in self._diffvars]
xall = casadi.vertcat(*xalltemp)
odealltemp = [value(self._rhsdict[i]) for i in self._derivlist]
odeall = casadi.vertcat(*odealltemp)
dae = {'x': xall, 'ode': odeall}
if len(self._algvars) != 0:
zalltemp = [self._templatemap[i] for i in self._simalgvars]
zall = casadi.vertcat(*zalltemp)
algalltemp = [value(i) for i in self._alglist]
algall = casadi.vertcat(*algalltemp)
dae['z'] = zall
dae['alg'] = algall
integrator_options['grid'] = tsim
integrator_options['output_t0'] = True
F = casadi.integrator('F', integrator, dae, integrator_options)
sol = F(x0=initcon)
profile = sol['xf'].full().T
if len(self._algvars) != 0:
algprofile = sol['zf'].full().T
profile = np.concatenate((profile, algprofile), axis=1)
return [tsim, profile]
def calculateTurbulent_cf(self, S_0):
"""
Return the respective streamline given a starting position.
point : [x y z] defining the starting point. MUST be one of the
starting points used to generate the streamlines in the first place.
"""
self.getProperties(S_0, 'turbulent')
cf_0 = 0.455 / ( self.Q**2. * log(0.06/self.Q * self.localReynolds * self.mu/self.mu_wall * (self.TwOnT)**-0.5 )**2.)
cf_0 = 0.00165
self.chi_0 = (2. / cf_0)**0.5
self.streamlineCfs = []
self.streamlineXs = []
r1 = ode(self.whiteChristoph1).set_integrator('dopri5', atol=1e-2, rtol=1e-2, )
r1.set_initial_value(self.chi_0, S_0)
r2 = ode(self.whiteChristoph2).set_integrator('dopri5', atol=1e-2, rtol=1e-2, )
r2.set_initial_value(self.chi_0, S_0)
dt = 0.01
# Integrate along the streamline
while r1.t < self.parameterisedStreamline[-1] and r2.t < self.parameterisedStreamline[-1]:
# print r1.t + dt
# print r2.t + dt
r1.integrate(r1.t+dt)
if self.ReynoldsStar < 0:
self.streamlineCfs.append(2. / r1.y**2.)
self.streamlineXs.append(r1.t)
else:
# print "r2"
r2.integrate(r2.t+dt)
if r1.y/self.chi_max < 0.36:
self.streamlineCfs.append(2. / r1.y**2.)
self.streamlineXs.append(r1.t)
elif r2.y/self.chi_max > 0.36:
self.streamlineCfs.append(2. / r2.y**2.)
self.streamlineXs.append(r2.t)
else:
print "Something stuffed up"
return self.streamlineCfs, self.streamlineXs
def line(self, x0):
"""Returns the field line passing through x0.
Refs: http://folk.uib.no/fcihh/seminar/lec1.pdf and lect2.pdf
http://numbercrunch.de/blog/2013/05/visualizing-streamlines/
and especially: "Electric field lines don't work",
http://scitation.aip.org/content/aapt/journal/ajp/64/6/10.1119/1.18237
"""
if None in [XMIN, XMAX, YMIN, YMAX]:
raise ValueError('Domain must be set using init().')
# Set up integrator for the field line
streamline = lambda t, y: list(self.direction(y))
solver = ode(streamline).set_integrator('vode')
# Initialize the coordinate lists
x = [x0]
# Integrate in both the forward and backward directions
dt = 0.008
# Solve in both the forward and reverse directions
for sign in [1, -1]:
# Set the starting coordinates and time
solver.set_initial_value(x0, 0)
# Integrate field line over successive time steps
while solver.successful():
# Find the next step
solver.integrate(solver.t + sign*dt)
# Save the coordinates
if sign > 0:
x.append(solver.y)
else:
x.insert(0, solver.y)
# Check if line connects to a charge
flag = False
for c in self.charges:
if c.is_close(solver.y):
flag = True
break
# Terminate line at charge or if it leaves the area of interest
if flag or not (XMIN < solver.y[0] < XMAX) or \
not YMIN < solver.y[1] < YMAX:
break
return FieldLine(x)