def density(self, density: u.kg/u.m**3):
"""Sets the simulation's density profile to the specified array.
Other arrays which depend on the density values, such as the kinetic
pressure, are then redefined automatically.
Parameters
----------
density : ndarray
Array of density values. Shape and size must be equal to those of
the simulation grid.
Must have units of density.
"""
assert density.shape == self.domain_shape, """
Specified density array shape {} does not match simulation grid {}.
""".format(density.shape, self.domain_shape)
self._density = density
# Momentum
python类s()的实例源码
def momentum(self, momentum: u.kg/(u.m**2 * u.s)):
"""Sets the simulation's momentum profile to the specified array.
Other arrays which depend on the velocity values, such as the kinetic
pressure,
are then redefined automatically.
Parameters
----------
momentum : ndarray
Array of momentum vectors. Shape must be (3, x, [y, z]), where x,
y and z are the dimensions of the simulation grid.
Note that a full 3D vector is necessary even if the simulation has
fewer than 3 dimensions.
"""
assert momentum.shape == (3, *self.domain_shape), """
Specified density array shape {} does not match simulation grid {}.
""".format(momentum.shape, (3, *self.domain_shape))
self._momentum = momentum
# Internal energy
def energy(self, energy: u.J/u.m**3):
"""Sets the simulation's total energy density profile to the specified array.
Other arrays which depend on the energy values, such as the kinetic
pressure, are then redefined automatically.
Parameters
----------
energy : ndarray
Array of energy values. Shape must be (x, y, z), where x, y, and z
are the grid sizes of the simulation in the x, y, and z dimensions.
Must have units of energy.
"""
assert energy.shape == self.domain_shape, """
Specified density array shape {} does not match simulation grid {}.
""".format(energy.shape, self.domain_shape)
self._energy = energy
# Magnetic field
def magnetic_field(self, magnetic_field: u.Tesla):
"""
Sets the simulation's magnetic field profile to the specified array.
Other arrays which depend on the magnetic field, such as the magnetic
pressure, are then redefined automatically.
Parameters
----------
magnetic_field : ndarray
Array of magnetic field values. Shape must be (3, x, [y, z]),
where x, y, and z are the grid sizes of the simulation in the x, y,
and z dimensions.
Note that a full 3D vector is necessary even if the simulation has
fewer than 3 dimensions.
"""
assert magnetic_field.shape == (3, *self.domain_shape), """
Specified density array shape {} does not match simulation grid {}.
""".format(magnetic_field.shape, (3, *self.domain_shape))
self._magnetic_field = magnetic_field
def __init__(self, plasma):
"""
"""
self.dt = 1e-4 * u.s
self.current_iteration = 0
self.current_time = 0 * u.s
self.plasma = plasma
# Domain size
# self.grid_size = grid_size
# Physical parameters
# self.gamma = gamma
grids = (self.plasma.x.si, self.plasma.y.si, self.plasma.z.si)
ranges = [grid for grid in grids if len(grid) > 1]
stepsize = [range[1] - range[0] for range in ranges] * grids[0].unit
self.solver = Solver(stepsize)
# Collect equations into a nice easy-to-use list
self.equations = [self._ddt_density, self._ddt_momentum,
self._ddt_energy, self._ddt_magfield]
def direct_ionization_rate(self):
"""
Calculate direct ionization rate in cm3/s
Needs an equation reference or explanation
"""
xgl, wgl = np.polynomial.laguerre.laggauss(12)
kBT = const.k_B.cgs*self.temperature
energy = np.outer(xgl, kBT)*kBT.unit + self.ip
cross_section = self.direct_ionization_cross_section(energy)
if cross_section is None:
return None
term1 = np.sqrt(8./np.pi/const.m_e.cgs)*np.sqrt(kBT)*np.exp(-self.ip/kBT)
term2 = ((wgl*xgl)[:,np.newaxis]*cross_section).sum(axis=0)
term3 = (wgl[:,np.newaxis]*cross_section).sum(axis=0)*self.ip/kBT
return term1*(term2 + term3)
def ionization_rate(self):
"""
Total ionization rate.
Includes contributions from both direct ionization and excitation-autoionization
See Also
--------
direct_ionization_rate
excitation_autoionization_rate
"""
di_rate = self.direct_ionization_rate()
di_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if di_rate is None else di_rate
ea_rate = self.excitation_autoionization_rate()
ea_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if ea_rate is None else ea_rate
return di_rate + ea_rate
def recombination_rate(self):
"""
Total recombination rate.
Includes contributions from dielectronic recombination and radiative recombination.
See Also
--------
radiative_recombination_rate
dielectronic_recombination_rate
"""
rr_rate = self.radiative_recombination_rate()
rr_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if rr_rate is None else rr_rate
dr_rate = self.dielectronic_recombination_rate()
dr_rate = np.zeros(self.temperature.shape)*u.cm**3/u.s if dr_rate is None else dr_rate
return rr_rate + dr_rate
def GetFluxRatio(sptlist, Tsec, xgrid):
"""
Returns the flux ratio between the secondary star of temperature Tsec
and the (possibly multiple) primary star(s) given in the
'sptlist' list (given as spectral types)
xgrid is a np.ndarray containing the x-coordinates to find the
flux ratio at (in nm)
"""
prim_flux = np.zeros(xgrid.size)
# Determine the flux from the primary star(s)
for spt in sptlist:
end = search("[0-9]", spt).end()
T = MS.Interpolate(MS.Temperature, spt[:end])
R = MS.Interpolate(MS.Radius, spt[:end])
prim_flux += Planck(xgrid * units.nm.to(units.cm), T) * R ** 2
# Determine the secondary star flux
s_spt = MS.GetSpectralType(MS.Temperature, Tsec)
R = MS.Interpolate(MS.Radius, s_spt)
sec_flux = Planck(xgrid * units.nm.to(units.cm), Tsec) * R ** 2
return sec_flux / prim_flux
def integral(x, y, I, k=10):
"""
Integrate y = f(x) for x = 0 to a such that the integral = I
I can be an array.
Returns the values a that are found.
"""
I = np.atleast_1d(I)
f = UnivariateSpline(x, y, s=k)
# Integrate as a function of x
F = f.antiderivative()
Y = F(x)
a = []
for intval in I:
F2 = UnivariateSpline(x, Y/Y[-1] - intval, s=0)
a.append(F2.roots())
return np.hstack(a)
def get_max_separation(p_spt, s_temp, v, d=1.0 * u.parsec):
"""
Get the maximum separation for a binary candidate
:param p_spt: The spectral type of the primary star
:param s_temp: The temperature of the companion
:param v: The velocity, in km/s, of the companion
:param d: The distance, in parsec, to the system
:return: The maximum primary-->secondary separation, in arcseconds
"""
# Convert the companion temperature and primary spectral type to mass
from kglib.spectral_type import Mamajek_Table
MS = SpectralTypeRelations.MainSequence()
MT = Mamajek_Table.MamajekTable()
teff2mass = MT.get_interpolator('Teff', 'Msun')
M1 = MS.Interpolate('mass', p_spt)
M2 = teff2mass(s_temp)
Mt = (M1 + M2) * u.M_sun
# Compute the maximum separation
G = constants.G
a_max = (G * Mt / v ** 2)
alpha_max = (a_max / d).to(u.arcsecond, equivalencies=u.dimensionless_angles())
return alpha_max
def plot_profile(wave, flux, line, name):
# define the velocity scale [AA / s]
vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c
# convert to [km / s]
vel_kms = vel_aas.to('km/s')
# define parameters for the x and y axes
ax.set_xlim(xmin, xmax)
ax.xaxis.set_minor_locator(minorLocator)
ax.tick_params(axis='x', labelsize=xls)
ax.set_ylim(0., 1.5)
# make the plot
ax.plot(vel_kms, flux)
# include the name of the line
plt.text(xmin+0.03*(xmax-xmin), 0.15, name)
# mark the approximate centroid velocity
plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
# label other lines for clarity
for k in range(0, len(lines)):
vel_off_aas = (lines[k] - line) / line * c
vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s)
plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
# define the data directory
def plot_profile(wave, flux, line, name):
# define the velocity scale [AA / s]
vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c
# convert to [km / s]
vel_kms = vel_aas.to('km/s')
# define parameters for the x and y axes
ax.set_xlim(xmin, xmax)
ax.xaxis.set_minor_locator(minorLocator)
ax.tick_params(axis='x', labelsize=xls)
ax.set_ylim(0., 3.)
# make the plot
ax.plot(vel_kms, np.log(1/flux))
# include the name of the line
plt.text(xmin+0.03*(xmax-xmin), 2.3, name)
# mark the approximate centroid velocity
plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
# label other lines for clarity
for k in range(0, len(lines)):
vel_off_aas = (lines[k] - line) / line * c
vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s)
plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
# define the data directory
def test_circular_integration_1step(self):
"""Compare total number and integrated column density for circle.
"""
from ..core import CircularAperture
Nobs = 6.41756750e26
parent = 1.4e4 * u.km
daughter = 1.7e5 * u.km
Q = 5.8e23 / u.s
v = 1 * u.km / u.s
aper = CircularAperture(3300 * u.km)
coma = Haser(Q, v, parent, daughter)
N1 = coma.total_number(aper)
N2 = coma._integrate_column_density(aper)
assert np.isclose(N1, N2)
def __init__(self, Q, species):
"""Parameters
----------
Q : `Astropy.units` quantity or iterable, mandatory
production rate usually in units of `u.molecule / u.s`
species : dictionary or list of dictionaries, mandatory
defines gas velocity, lifetimes, disassociative lifetimes
Returns
-------
Vectorial instance
Examples
--------
TBD
not yet implemented
"""
pass
def pds_ferret(targetid, bib=None):
"""Use the Small Bodies Data Ferret (http://sbntools.psi.edu/ferret/)
at the Planetary Data System's Small Bodies Node to query for
information on a specific small body in the PDS.
Parameters
----------
targetid : str, mandatory
target identifier
bib : SBPy Bibliography instance, optional, default None
Bibliography instance that will be populated
Returns
-------
data : dict
A hierarchical data object
Examples
--------
>>> from sbpy.data import pds_ferret
>>> data = pds_ferret('ceres') # doctest: +SKIP
not yet implemented
"""
def load_results(self, loop):
"""
Load EBTEL output for a given loop object.
Parameters
----------
loop : `synthesizAR.Loop` object
"""
# load text
N_s = len(loop.field_aligned_coordinate)
_tmp = np.loadtxt(loop.hydro_configuration['output_filename'])
# reshape into a 1D loop structure with units
time = _tmp[:, 0]*u.s
electron_temperature = np.outer(_tmp[:, 1], np.ones(N_s))*u.K
ion_temperature = np.outer(_tmp[:, 2], np.ones(N_s))*u.K
density = np.outer(_tmp[:, 3], np.ones(N_s))*(u.cm**(-3))
velocity = np.outer(_tmp[:, -2], np.ones(N_s))*u.cm/u.s
# flip sign of velocity at apex
i_mirror = np.where(np.diff(loop.coordinates.value[:, 2]) > 0)[0][-1] + 2
velocity[:, i_mirror:] = -velocity[:, i_mirror:]
return time, electron_temperature, ion_temperature, density, velocity
def non_equilibrium_ionization(self, time: u.s, temperature: u.K, density: u.cm**(-3), rate_matrix=None,
initial_condition=None):
"""
Compute the ionization fraction in non-equilibrium for a given temperature and density
timeseries.
"""
if rate_matrix is None:
rate_matrix = self._rate_matrix()
if initial_condition is None:
initial_condition = self.equilibrium_ionization(rate_matrix=rate_matrix)
interpolate_indices = [np.abs(self.temperature - t).argmin() for t in temperature]
y = np.zeros(time.shape + (self.atomic_number+1,))
y[0, :] = initial_condition[interpolate_indices[0], :]
identity = u.Quantity(np.eye(self.atomic_number + 1))
for i in range(1, time.shape[0]):
dt = time[i] - time[i-1]
term1 = identity - density[i]*dt/2.*rate_matrix[interpolate_indices[i], :, :]
term2 = identity + density[i-1]*dt/2.*rate_matrix[interpolate_indices[i-1], :, :]
y[i, :] = np.linalg.inv(term1) @ term2 @ y[i-1, :]
y[i, :] = np.fabs(y[i, :])
y[i, :] /= y[i, :].sum()
return u.Quantity(y)
def proton_collision_rate(self):
"""
Calculates the collision rate for de-exciting and exciting collisions for protons
"""
# Create scaled temperature--these are not stored in the file
bt_t = np.vectorize(np.linspace, excluded=[0, 1], otypes='O')(0, 1, [ups.shape[0]
for ups in self._psplups['bt_rate']])
# Get excitation rates directly from scaled data
energy_ratio = np.outer(const.k_B.cgs*self.temperature, 1.0/self._psplups['delta_energy'].to(u.erg))
ex_rate = np.array(list(map(self.burgess_tully_descale, bt_t, self._psplups['bt_rate'], energy_ratio.T,
self._psplups['bt_c'], self._psplups['bt_type'])))
ex_rate = u.Quantity(np.where(ex_rate > 0., ex_rate, 0.), u.cm**3/u.s).T
# Calculation de-excitation rates from excitation rate
omega_upper = 2.*self._elvlc['J'][self._psplups['upper_level'] - 1] + 1.
omega_lower = 2.*self._elvlc['J'][self._psplups['lower_level'] - 1] + 1.
dex_rate = (omega_lower/omega_upper)*ex_rate*np.exp(1./energy_ratio)
return dex_rate, ex_rate
def __init__(self, domain_x, domain_y, domain_z, gamma=5/3):
# Define domain sizes
self.x = domain_x
self.y = domain_y
self.z = domain_z
x, y, z = self.x.si.value, self.y.si.value, self.z.si.value
self.grid = np.meshgrid(x, y, z, indexing='ij') * u.m
self.grid = np.squeeze(self.grid)
self.domain_shape = []
for length in (len(x), len(y), len(z)):
if length > 1:
self.domain_shape.append(length)
self.domain_shape = tuple(self.domain_shape)
print(self.domain_shape)
self.gamma = gamma
# Initiate core plasma variables
self._density = np.zeros(self.domain_shape) * u.kg / u.m**3
self._momentum = np.zeros((3, *self.domain_shape)) * u.kg \
/ (u.m**2 * u.s)
self._energy = np.zeros(self.domain_shape) * u.J / u.m**3
self._magnetic_field = np.zeros((3, *self.domain_shape)) * u.T
# Collect core variables into a list for usefulness
self.core_variables
# Connect a simulation object for simulating
self.simulation_physics = MHDSimulation(self)
def pressure(self):
"""Sets the simulation's kinetic pressure profile to the specified array.
The kinetic pressure is defined as:
.. math::
p = (\\gamma - 1) (e_0 - \\frac{\\rho\\textbf{v}^2}{2})
"""
v = self.velocity
return (self.gamma - 1) \
* (self.energy - ((self.density * dot(v, v)) / 2))
def simulate(self, max_its=np.inf, max_time=np.inf*u.s):
"""Simulates the plasma as set up, either for the given number of
iterations or until the simulation reaches the given time.
Parameters
----------
max_its : int
Tells the simulation to run for a set number of iterations.
max_time : astropy.units.Quantity
Maximum total (in-simulation) time to allow the simulation to run.
Must have units of time.
Examples
--------
>>> # Run a simulation for exactly one thousand iterations.
>>> myplasma.simulate(max_time=1000)
>>> # Run a simulation for up to half an hour of simulation time.
>>> myplasma.simulate(max_time=30*u.minute)
"""
if np.isinf(max_its) and np.isinf(max_time.value):
raise ValueError("Either max_time or max_its must be set.")
physics = self.simulation_physics
dt = physics.dt
if np.isinf(max_time):
pb = ProgressBar(max_its)
else:
pb = ProgressBar(int(max_time / dt))
with pb as bar:
while (physics.current_iteration < max_its
and physics.current_time < max_time):
physics.time_stepper()
bar.update()
def burgess_tully_descale(x, y, energy_ratio, c, scaling_type):
"""
Convert scaled Burgess-Tully parameters to physical quantities. For more details see
[1]_.
References
----------
.. [1] Burgess, A. and Tully, J. A., 1992, A&A, `254, 436 <http://adsabs.harvard.edu/abs/1992A%26A...254..436B>`_
"""
nots = splrep(x, y, s=0)
if scaling_type == 1:
x_new = 1.0 - np.log(c)/np.log(energy_ratio + c)
upsilon = splev(x_new, nots, der=0)*np.log(energy_ratio + np.e)
elif scaling_type == 2:
x_new = energy_ratio/(energy_ratio + c)
upsilon = splev(x_new, nots, der=0)
elif scaling_type == 3:
x_new = energy_ratio/(energy_ratio + c)
upsilon = splev(x_new, nots, der=0)/(energy_ratio + 1.0)
elif scaling_type == 4:
x_new = 1.0 - np.log(c)/np.log(energy_ratio + c)
upsilon = splev(x_new, nots, der=0)*np.log(energy_ratio + c)
elif scaling_type == 5:
# dielectronic
x_new = energy_ratio/(energy_ratio + c)
upsilon = splev(x_new, nots, der=0)/energy_ratio
elif scaling_type == 6:
# protons
x_new = energy_ratio/(energy_ratio + c)
upsilon = 10**splev(x_new, nots, der=0)
else:
raise ValueError('Unrecognized BT92 scaling option.')
return upsilon
def test_create_ion_with_wrong_units_raises_unit_conversion_error():
with pytest.raises(u.UnitsError):
fiasco.Ion('fe_5', temperature.value*u.s)
def preprocessor(self, table, line, index):
line = line.strip().split()
if index == 0:
filetype = int(line[0])
table.append([filetype])
if filetype == 1:
self.dtypes = [int,float,float,float,float]
self.units = [None,(u.cm**3)/u.s,None,u.K,u.K]
self.headings = ['fit_type', 'A_fit', 'B_fit', 'T0_fit', 'T1_fit']
self.descriptions = ['fit type','A fit parameter','B fit parameter',
'T0 fit parameter','T1 fit parameter']
elif filetype == 2:
self.dtypes = [int,float,float,float,float,float,float]
self.units = [None,(u.cm**3)/u.s,None,u.K,u.K,None,u.K]
self.headings = ['fit_type', 'A_fit', 'B_fit', 'T0_fit', 'T1_fit', 'C_fit', 'T2_fit']
self.descriptions = ['fit type','A fit parameter','B fit parameter',
'T0 fit parameter','T1 fit parameter','C fit parameter',
'T2 fit parameter']
elif filetype == 3:
self.dtypes = [int,float,float]
self.units = [None,(u.cm**3)/u.s,None]
self.headings = ['fit_type', 'A_fit', 'eta_fit']
self.descriptions = ['fit type','A rad fit parameter','eta fit parameter']
else:
raise ValueError('Unrecognized .rrparams filetype {}'.format(filetype))
else:
if table[0][0] == 1 or table[0][0] == 2:
table[0] += line[3:]
else:
table[0] += line[2:]
def preprocessor(self, table, line, index):
line = line.strip().split()
if index == 0:
self._drparams_filetype = int(line[0])
if self._drparams_filetype == 1:
# Badnell type
self.dtypes = [int,float,float]
self.units = [None,u.K,(u.cm**3)/u.s*(u.K**(3/2))]
self.headings = ['fit_type', 'E_fit', 'c_fit']
self.descriptions = ['fit type', 'E fit parameter','c fit parameter']
elif self._drparams_filetype == 2:
# Shull type
self.dtypes = [int,float,float,float,float]
self.units = [None,(u.cm**3)/u.s*(u.K**(3/2)),u.dimensionless_unscaled,u.K,u.K]
self.headings = ['fit_type', 'A_fit', 'B_fit', 'T0_fit', 'T1_fit']
self.descriptions = ['fit type','A fit coefficient','B fit coefficient',
'T0 fit coefficient','T1 fit coefficient']
else:
raise ValueError('Unrecognized drparams filetype {}'.format(self._drparams_filetype))
else:
if self._drparams_filetype == 1:
tmp = np.array(line[2:],dtype=float)
if index%2 == 0:
tmp_col = table[-1]
for i in range(tmp.shape[0]):
table.append([self._drparams_filetype,tmp_col[i],tmp[i]])
del table[0]
else:
table.append(tmp)
else:
table.append([self._drparams_filetype]+line[2:])
def split_by_component(df):
df['prim_comp'] = df.Comp.map(lambda s: s[0])
df['sec_comp'] = df.Comp.map(lambda s: s[-1])
comps = pd.concat((df[['prim_comp', 'Sp1']], df[['sec_comp', 'Sp2']]))
prim = comps.loc[comps.prim_comp.notnull()].rename(columns={'Sp1': 'SpT', 'prim_comp': 'comp'})
sec = comps.loc[comps.sec_comp.notnull()].rename(columns={'Sp2': 'SpT', 'sec_comp': 'comp'})
return pd.concat((prim, sec))[['comp', 'SpT']].drop_duplicates(subset='comp')
def safe_convert(s, default=0):
"""
Converts something to a float. If an error occurs, returns the default value.
"""
try:
v = float(s)
except ValueError:
v = default
return v
def convert_to_hex(val, delimiter=':', force_sign=False):
"""
Converts a numerical value into a hexidecimal string
Parameters:
===========
- val: float
The decimal number to convert to hex.
- delimiter: string
The delimiter between hours, minutes, and seconds
in the output hex string.
- force_sign: boolean
Include the sign of the string on the output,
even if positive? Usually, you will set this to
False for RA values and True for DEC
Returns:
========
A hexadecimal representation of the input value.
"""
s = np.sign(val)
s_factor = 1 if s > 0 else -1
val = np.abs(val)
degree = int(val)
minute = int((val - degree)*60)
second = (val - degree - minute/60.0)*3600.
if degree == 0 and s_factor < 0:
return '-00{2:s}{0:02d}{2:s}{1:.2f}'.format(minute, second, delimiter)
elif force_sign or s_factor < 0:
deg_str = '{:+03d}'.format(degree * s_factor)
else:
deg_str = '{:02d}'.format(degree * s_factor)
return '{0:s}{3:s}{1:02d}{3:s}{2:.2f}'.format(deg_str, minute, second, delimiter)
def fwhm(x, y, k=10, ret_roots=False):
"""
Determine full-with-half-maximum of a peaked set of points, x and y.
Assumes that there is only one peak present in the dataset. The function
uses a spline interpolation with smoothing parameter k ('s' in scipy.interpolate.UnivariateSpline).
If ret_roots=True, return the x-locations at half maximum instead of just
the distance between them.
"""
class NoPeaksFound(Exception):
pass
half_max = np.max(y) / 2.0
s = UnivariateSpline(x, y - half_max, s=k)
roots = s.roots()
if len(roots) > 2:
# Multiple peaks. Use the two that straddle the maximum value
maxvel = x[np.argmax(y)]
left_idx = np.argmin(maxvel - roots)
right_idx = np.argmin(roots - maxvel)
roots = np.array((roots[left_idx], roots[right_idx]))
elif len(roots) < 2:
raise NoPeaksFound("No proper peaks were found in the data set; likely "
"the dataset is flat (e.g. all zeros).")
if ret_roots:
return roots[0], roots[1]
return abs(roots[1] - roots[0])