def t_E(self):
"""
*astropy.Quantity*
The Einstein timescale. "day" is the default unit. Regardless
of input value, returns value with units of u.day. May be set
as a *float* --> assumes units of degrees.
"""
if 't_E' in self.parameters.keys():
self._check_time_quantity('t_E')
return self.parameters['t_E'].to(u.day).value
elif ('t_star' in self.parameters.keys() and
'rho' in self.parameters.keys()):
return self.t_star/self.rho
elif ('t_eff' in self.parameters.keys() and
'u_0' in self.parameters.keys()):
return self.t_eff/self.u_0
else:
raise KeyError("You're trying to access t_E that was not set")
python类Quantity()的实例源码
def alpha(self):
"""
*astropy.Quantity*
The angle of the source trajectory relative to the binary lens
axis (or primary-secondary axis). Measured counterclockwise,
i.e., according to convention advocated by `Skowron et
al. 2011 (ApJ, 738, 87)
<http://adsabs.harvard.edu/abs/2011ApJ...738...87S>`_. May be
set as a *float* --> assumes "deg" is the default unit.
Regardless of input value, returns value in degrees.
"""
if not isinstance(self.parameters['alpha'], u.Quantity):
self.parameters['alpha'] = self.parameters['alpha'] * u.deg
return self.parameters['alpha'].to(u.deg)
def _set_single_mass(self, new_mass):
"""
Initializes total_mass and epsilon if only one mass componenet
is defined (i.e. a point lens).
"""
if isinstance(new_mass, u.Quantity):
if new_mass.unit.physical_type == 'dimensionless':
new_mass *= u.solMass
elif new_mass.unit.physical_type != 'mass':
msg = 'wrong physical_type of new total_mass: {:}'
raise ValueError(msg.format(new_mass.unit.physical_type))
self._total_mass = new_mass
else:
self._total_mass = new_mass * u.solMass
self._epsilon = np.array([1.0])
def _add_mass(self, new_mass, index):
"""
Private function: Updates the total_mass and adds a component
to the epsilon array if masses are added
sequentially. e.g. the lens is defined by defining mass_1 and
mass_2.
"""
if not isinstance(new_mass, u.Quantity):
new_mass *= u.solMass
elif new_mass.unit.physical_type == 'dimensionless':
new_mass *= u.solMass
elif new_mass.unit.physical_type != 'mass':
msg = 'wrong physical_type of new total_mass: {:}'
raise ValueError(msg.format(new_mass.unit.physical_type))
new_total_mass = self._total_mass + new_mass
self._epsilon = self._total_mass * self._epsilon / new_total_mass
self._epsilon = np.insert(
self._epsilon, index, new_mass / new_total_mass)
self._total_mass = new_total_mass
def radial_search(self, inp, radius, **kwargs):
""" Search for sources in a radius around the input coord
Parameters
----------
inp : str or tuple or SkyCoord
See linetools.utils.radec_to_coord for details
Single coordinate
radius : Angle or Quantity
Tolerance for a match
Returns
-------
idx : int array
Catalog IDs corresponding to match in order of increasing separation
Returns an empty array if there is no match
"""
raise DeprecationWarning("THIS METHOD HAS BEEN DEPRECATED. USE query_position()")
def ioneq(self):
"""
Ionization equilibrium data interpolated to the given temperature
Note
----
Will return NaN where interpolation is out of range of the data. For computing
ionization equilibrium outside of this temperature range, it is better to use
`fiasco.Element.equilibrium_ionization`
"""
f = interp1d(self._ioneq[self._dset_names['ioneq_filename']]['temperature'],
self._ioneq[self._dset_names['ioneq_filename']]['ionization_fraction'],
kind='linear', bounds_error=False, fill_value=np.nan)
ioneq = f(self.temperature)
isfinite = np.isfinite(ioneq)
ioneq[isfinite] = np.where(ioneq[isfinite] < 0., 0., ioneq[isfinite])
return u.Quantity(ioneq)
def excitation_autoionization_rate(self):
"""
Calculate ionization rate due to excitation autoionization
"""
# Collision constant
c = (const.h.cgs**2)/((2. * np.pi * const.m_e.cgs)**(1.5) * np.sqrt(const.k_B.cgs))
kBTE = u.Quantity([(const.k_B.cgs * self.temperature) / (de.to(u.erg))
for de in self._easplups['delta_energy']])
# Descale upsilon
shape = self._easplups['bt_upsilon'].shape
xs = np.tile(np.linspace(0, 1, shape[1]), shape[0]).reshape(shape)
args = [xs, self._easplups['bt_upsilon'].value, kBTE.value, self._easplups['bt_c'].value,
self._easplups['bt_type']]
upsilon = u.Quantity(list(map(self.burgess_tully_descale, *args)))
# Rate coefficient
rate = c * upsilon * np.exp(-1 / kBTE) / np.sqrt(self.temperature[np.newaxis,:])
return rate.sum(axis=0)
def __getitem__(self, key):
if type(key) is int:
raise NotImplementedError('Iteration not supported.')
with h5py.File(self.hdf5_dbase_root, 'r') as hf:
grp = hf[self.top_level_path]
if key not in grp:
raise IndexError('{} not found in {} filetype'.format(key, self.top_level_path))
ds = grp[key]
if isinstance(ds, h5py.Group):
data = DataIndexer(self.hdf5_dbase_root, '/'.join([self.top_level_path, key]))
else:
if ds.attrs['unit'] == 'SKIP' or ds.dtype == 'object':
data = np.array(ds, dtype=ds.dtype)
else:
data = u.Quantity(np.array(ds), ds.attrs['unit'], dtype=ds.dtype)
if '|S' in data.dtype.str:
data = data.astype(str)
return data
def strip_units(self):
"""
Strips units from an xypoint structure.
Returns:
A copy of the xypoint with no units
The x-units
the y-units
"""
xunits = self.x.unit if isinstance(self.x, u.Quantity) else 1.0
yunits = self.y.unit if isinstance(self.y, u.Quantity) else 1.0
x = self.x.value if isinstance(self.x, u.Quantity) else self.x
y = self.y.value if isinstance(self.y, u.Quantity) else self.y
err = self.err.value if isinstance(self.err, u.Quantity) else self.err
cont = self.cont.value if isinstance(self.cont, u.Quantity) else self.cont
return xypoint(x=x, y=y, cont=cont, err=err), xunits, yunits
def integrate(self, wavelength_grid):
"""
Integrate the spectrum flux over the specified grid of wavelengths.
Parameters
----------
wavelength_grid : quantity_like
Returns
-------
integrated_flux : :class:`~astropy.units.Quantity`
"""
grid = u.Quantity(wavelength_grid)
grid = grid.to(self.wavelength.unit)
interpolator = interp1d(self.wavelength.value, self.flux.value,
kind='cubic')
new_flux = interpolator(grid.value)
return simps(new_flux, x=grid.value) * self.flux.unit * grid.unit
def _make_quantity(data, unit):
"""
Get a LogQuantity if the a LogUnit is used rather than a regular Quantity.
Parameters
----------
data: numpy.ndarray
the data
unit: ~astropy.unit.Unit or ~astropy.unit.LogUnit
The data units
Returns
-------
~astropy.unit.Quantity or ~astropy.unit.LogQuantity
depending on the unit type
"""
if isinstance(unit, LogUnit):
return LogQuantity(data, unit=unit)
else:
return Quantity(data, unit=unit)
def column_density(self, rho, eph=None):
"""Coma column density at a projected distance from nucleus.
Parameters
----------
rho : `~astropy.units.Quantity`
Projected distance of the region of interest on the plane of
the sky in units of length or angle.
eph : dictionary-like or `~sbpy.data.Ephem`
Ephemerides at epoch; requires geocentric distance as
`delta` keyword if aperture has angular units.
Returns
-------
sigma : float
"""
pass
def total_number(self, aper, eph=None):
"""Total number of molecules in aperture.
Parameters
----------
aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture`
Observation aperture as a radius for a circular aperture
(projected length, or angle) or an `Aperture` instance.
eph : dictionary-like or `~sbpy.data.Ephem`, optional
Ephemerides at epoch; requires geocentric distance as
`delta` keyword if aperture has angular units.
Returns
-------
N : int
"""
pass
def column_density(self, rho, eph=None):
from .core import rho_as_length
assert isinstance(rho, u.Quantity)
r = rho_as_length(rho, eph=eph)
x = 0 if self.parent is None else (r / self.parent).decompose()
y = 0 if self.daughter is None else (r / self.daughter).decompose()
sigma = self.Q / 2 / np.pi / r / self.v
if self.daughter is None or self.daughter == 0:
sigma *= np.pi / 2 - self._iK0(x)
elif self.parent is None or self.parent == 0:
sigma *= np.pi / 2 - self._iK0(y)
else:
sigma *= (self.daughter / (self.parent - self.daughter)
* (self._iK0(y) - self._iK0(x)))
return sigma.decompose()
def rho_as_angle(rho, eph):
"""Projected linear distance to angular distance.
Parameters
----------
rho : `~astropy.units.Quantity`
Projected distance in units of length.
eph : dictionary-like or `~sbpy.data.Ephem`
Ephemerides; requires geocentric distance as `delta`.
Returns
-------
rho_l : `~astropy.units.Quantity`
"""
if rho.unit.is_equivalent(u.m):
rho_a = np.arctan(rho / eph['delta'].to(u.m))
else:
assert rho.unit.is_equivalent(u.rad)
rho_a = rho
return rho_a
def rho_as_length(rho, eph):
"""Angular distance to projected linear distance.
Parameters
----------
rho : `~astropy.units.Quantity`
Projected distance in units of angle.
eph : dictionary-like or `~sbpy.data.Ephem`
Ephemerides; requires geocentric distance as `delta`.
Returns
-------
rho_l : `~astropy.units.Quantity`
"""
if rho.unit.is_equivalent(u.rad):
rho_l = eph['delta'].to(u.m) * np.tan(rho)
else:
assert rho.unit.is_equivalent(u.m)
rho_l = rho
return rho_l
def __call__(self, rho, eph=None):
"""Evaluate the aperture.
Parameters
----------
rho : `~astropy.units.Quantity`
Position to evaluate, in units of length or angle.
eph : dictionary-like or `~sbpy.data.Ephem`, optional
Ephemerides at epoch; requires geocentric distance as
`delta` keyword if the aperture's units and `rho`'s units do
not match.
"""
x = self._convert_unit(rho, eph)
# normalize to 1.0 at the center?
return np.exp(-x**2 / self.sigma**2 / 2)
def _process_map(self, tmp_map, crop=None, resample=None):
"""
Rotate, crop and resample map if needed. Can do any other needed processing here too.
Parameters
----------
map : `~sunpy.map.Map`
Original HMI map
crop : `tuple` `[bottom_left_corner,top_right_corner]`, optional
The lower left and upper right corners of the cropped map. Both should be of type
`~astropy.units.Quantity` and have the same units as `map.xrange` and `map.yrange`
resample : `~astropy.units.Quantity`, `[new_xdim,new_ydim]`, optional
The new x- and y-dimensions of the resampled map, should have the same units as
`map.dimensions.x` and `map.dimensions.y`
"""
tmp_map = tmp_map.rotate()
if crop is not None:
bottom_left = SkyCoord(*crop[0], frame=tmp_map.coordinate_frame)
top_right = SkyCoord(*crop[1], frame=tmp_map.coordinate_frame)
tmp_map = tmp_map.submap(bottom_left, top_right)
if resample is not None:
tmp_map = tmp_map.resample(resample, method='linear')
return tmp_map
def _filter_streamlines(self, streamline, close_threshold=0.05,
loop_length_range: u.cm =[2.e+9, 5.e+10]*u.cm, **kwargs):
"""
Check extracted loop to make sure it fits given criteria. Return True if it passes.
Parameters
----------
streamline : yt streamline object
close_threshold : `float`
percentage of domain width allowed between loop endpoints
loop_length_range : `~astropy.Quantity`
minimum and maximum allowed loop lengths (in centimeters)
"""
streamline = streamline[np.all(streamline != 0.0, axis=1)]
loop_length = np.sum(np.linalg.norm(np.diff(streamline, axis=0), axis=1))
if np.fabs(streamline[0, 2] - streamline[-1, 2]) > close_threshold*self.extrapolated_3d_field.domain_width[2]:
return False
elif loop_length > loop_length_range[1].to(u.cm).value or loop_length < loop_length_range[0].to(u.cm).value:
return False
else:
return True
def flatten_emissivities(channel, emission_model):
"""
Compute product between wavelength response and emissivity for all ions
"""
flattened_emissivities = []
for ion in emission_model:
wavelength, emissivity = emission_model.get_emissivity(ion)
if wavelength is None or emissivity is None:
flattened_emissivities.append(None)
continue
interpolated_response = splev(wavelength.value, channel['wavelength_response_spline'], ext=1)
em_summed = np.dot(emissivity.value, interpolated_response)
unit = emissivity.unit*u.count/u.photon*u.steradian/u.pixel*u.cm**2
flattened_emissivities.append(u.Quantity(em_summed, unit))
return flattened_emissivities
def restore(cls, savefile):
"""
Restore the emission model from a JSON representation.
"""
with open(savefile, 'r') as f:
restore_dict = json.load(f)
temperature = u.Quantity(restore_dict['temperature'], restore_dict['temperature_unit'])
density = u.Quantity(restore_dict['density'], restore_dict['density_unit'])
ion_list = [Ion(ion, temperature, **ds) for ion, ds in zip(restore_dict['ion_list'],
restore_dict['dset_names'])]
emission_model = cls(density, *ion_list)
if 'emissivity_savefile' in restore_dict:
emission_model.emissivity_savefile = restore_dict['emissivity_savefile']
if 'ionization_fraction_savefile' in restore_dict:
emission_model.ionization_fraction_savefile = restore_dict['ionization_fraction_savefile']
return emission_model
def non_equilibrium_ionization(self, time: u.s, temperature: u.K, density: u.cm**(-3), rate_matrix=None,
initial_condition=None):
"""
Compute the ionization fraction in non-equilibrium for a given temperature and density
timeseries.
"""
if rate_matrix is None:
rate_matrix = self._rate_matrix()
if initial_condition is None:
initial_condition = self.equilibrium_ionization(rate_matrix=rate_matrix)
interpolate_indices = [np.abs(self.temperature - t).argmin() for t in temperature]
y = np.zeros(time.shape + (self.atomic_number+1,))
y[0, :] = initial_condition[interpolate_indices[0], :]
identity = u.Quantity(np.eye(self.atomic_number + 1))
for i in range(1, time.shape[0]):
dt = time[i] - time[i-1]
term1 = identity - density[i]*dt/2.*rate_matrix[interpolate_indices[i], :, :]
term2 = identity + density[i-1]*dt/2.*rate_matrix[interpolate_indices[i-1], :, :]
y[i, :] = np.linalg.inv(term1) @ term2 @ y[i-1, :]
y[i, :] = np.fabs(y[i, :])
y[i, :] /= y[i, :].sum()
return u.Quantity(y)
def proton_collision_rate(self):
"""
Calculates the collision rate for de-exciting and exciting collisions for protons
"""
# Create scaled temperature--these are not stored in the file
bt_t = np.vectorize(np.linspace, excluded=[0, 1], otypes='O')(0, 1, [ups.shape[0]
for ups in self._psplups['bt_rate']])
# Get excitation rates directly from scaled data
energy_ratio = np.outer(const.k_B.cgs*self.temperature, 1.0/self._psplups['delta_energy'].to(u.erg))
ex_rate = np.array(list(map(self.burgess_tully_descale, bt_t, self._psplups['bt_rate'], energy_ratio.T,
self._psplups['bt_c'], self._psplups['bt_type'])))
ex_rate = u.Quantity(np.where(ex_rate > 0., ex_rate, 0.), u.cm**3/u.s).T
# Calculation de-excitation rates from excitation rate
omega_upper = 2.*self._elvlc['J'][self._psplups['upper_level'] - 1] + 1.
omega_lower = 2.*self._elvlc['J'][self._psplups['lower_level'] - 1] + 1.
dex_rate = (omega_lower/omega_upper)*ex_rate*np.exp(1./energy_ratio)
return dex_rate, ex_rate
def _set_time_quantity(self, key, new_time):
"""
Save a variable with units of time (e.g. t_E, t_star,
t_eff). If units are not given, assume days.
"""
if isinstance(new_time, u.Quantity):
self.parameters[key] = new_time
else:
self.parameters[key] = new_time * u.day
def _check_time_quantity(self, key):
if not isinstance(self.parameters[key], u.Quantity):
self._set_time_quantity(key, self.parameters[key])
def alpha(self, new_alpha):
if isinstance(new_alpha, u.Quantity):
self.parameters['alpha'] = new_alpha
else:
self.parameters['alpha'] = new_alpha * u.deg
def total_mass(self, new_mass):
if not isinstance(new_mass, u.Quantity):
new_mass *= u.solMass
elif new_mass.unit.physical_type == 'dimensionless':
new_mass *= u.solMass
elif new_mass.unit.physical_type != 'mass':
msg = 'wrong physical_type of new total_mass: {:}'
raise ValueError(msg.format(new_mass.unit.physical_type))
self._total_mass = new_mass
self._last_mass_set = 'total_mass'
def mass(self):
"""
*astropy.Quantity*
The mass of a point lens --> total mass. An astropy.Quantity
with mass units. May be set as a float (in which case solMass
is assumed).
"""
if self._epsilon.size > 1:
raise TypeError(
'mass can only be defined for a point lens. use total_mass' +
'for multiple bodies')
else:
return self.total_mass
def mass_1(self):
"""
*astropy.Quantity*
The mass of the primary. Defined as total_mass *
epsilon[0]. An *astropy.Quantity* with mass units. If set as a
*float*, units are assumed to be solMass.
"""
return self.total_mass * self._epsilon[0]
def mass_2(self):
"""
*astropy.Quantity*
The mass of the secondary. Defined as total_mass *
epsilon[1]. An *astropy.Quantity* with mass units. If set as a
*float*, units are assumed to be solMass.
Note that if total_mass is defined before mass_2, and there is
no epsilon corresponding to mass_2, mass_2 is added to the
total_mass.
"""
return self.total_mass * self._epsilon[1]