def deblend_cube(region='OrionA',vmin=0.,vmax=20.):
tex = fits.getdata('{0}/parameterMaps/{0}_Tex_DR1_rebase3_flag.fits'.format(region))
mom0 = fits.getdata('{0}/{0}_NH3_11_DR1_rebase3_mom0_QA_trim.fits'.format(region))
vlsr = fits.getdata('{0}/parameterMaps/{0}_Vlsr_DR1_rebase3_flag.fits'.format(region))
sigv = fits.getdata('{0}/parameterMaps/{0}_Sigma_DR1_rebase3_flag.fits'.format(region))
nnh3 = fits.getdata('{0}/parameterMaps/{0}_N_NH3_DR1_rebase3_flag.fits'.format(region))
cube = SpectralCube.read('{0}/{0}_NH3_11_DR1_rebase3_trim.fits'.format(region))
cube = cube.with_spectral_unit(u.km/u.s,velocity_convention='radio')
tpeak = mom0/(np.sqrt(2*np.pi)*sigv)
vlsr[vlsr==0]=np.nan
sigv[sigv==0]=np.nan
deblend = np.zeros(cube.shape)
hdr = cube.wcs.to_header()
spaxis = cube.spectral_axis.value
for plane in np.arange(deblend.shape[0]):
deblend[plane,:,:] = tpeak*np.exp(-(spaxis[plane]-vlsr)**2/(2*sigv**2))
newcube = SpectralCube(deblend,cube.wcs,header=hdr)
slab = newcube.spectral_slab(vmin*u.km/u.s,vmax*u.km/u.s)
slab.write('{0}/{0}_singlecomp.fits'.format(region),overwrite=True)
python类km()的实例源码
def test_plot_rv_curves():
pars = dict(P=[30.]*u.day, K=[10.]*u.km/u.s, ecc=[0.11239],
omega=[0.]*u.radian, phi0=[np.pi/2]*u.radian)
samples = JokerSamples(VelocityTrend2)
for k in pars:
samples[k] = pars[k]
samples['v0'] = [150] * u.km/u.s
samples['v1'] = [0.01] * u.km/u.s/u.day
t_grid = np.random.uniform(56000, 56500, 1024)
t_grid.sort()
fig, ax = plt.subplots(1, 1, figsize=(12,5))
plot_rv_curves(samples, t_grid, ax=ax, trend_t0=56000.)
def mean_spectra(region,line,file_extension,restFreq,spec_param):
'''
Sum spectra over entire mapped region
Cubes are missing BUNIT header parameter. Fix.
'''
filein = '{0}/0{}_{1}_{2}_trim.fits'.format(region,line,file_extension)
#add_fits_units(filein,'K')
cube = SpectralCube.read(filein)
#trim_edge_cube(cube)
slice_unmasked = cube.unmasked_data[:,:,:]
if line == 'NH3_33':
slice_unmasked[spec_param['mask33_chans'][0]:spec_param['mask33_chans'][1],:,:]=0.
summed_spectrum = np.nanmean(slice_unmasked,axis=(1,2))
cube2 = cube.with_spectral_unit(u.km/u.s,velocity_convention='radio',
rest_value=restFreq*u.GHz)
return summed_spectrum, cube2.spectral_axis
def get_max_separation(p_spt, s_temp, v, d=1.0 * u.parsec):
"""
Get the maximum separation for a binary candidate
:param p_spt: The spectral type of the primary star
:param s_temp: The temperature of the companion
:param v: The velocity, in km/s, of the companion
:param d: The distance, in parsec, to the system
:return: The maximum primary-->secondary separation, in arcseconds
"""
# Convert the companion temperature and primary spectral type to mass
from kglib.spectral_type import Mamajek_Table
MS = SpectralTypeRelations.MainSequence()
MT = Mamajek_Table.MamajekTable()
teff2mass = MT.get_interpolator('Teff', 'Msun')
M1 = MS.Interpolate('mass', p_spt)
M2 = teff2mass(s_temp)
Mt = (M1 + M2) * u.M_sun
# Compute the maximum separation
G = constants.G
a_max = (G * Mt / v ** 2)
alpha_max = (a_max / d).to(u.arcsecond, equivalencies=u.dimensionless_angles())
return alpha_max
def plot_profile(wave, flux, line, name):
# define the velocity scale [AA / s]
vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c
# convert to [km / s]
vel_kms = vel_aas.to('km/s')
# define parameters for the x and y axes
ax.set_xlim(xmin, xmax)
ax.xaxis.set_minor_locator(minorLocator)
ax.tick_params(axis='x', labelsize=xls)
ax.set_ylim(0., 1.5)
# make the plot
ax.plot(vel_kms, flux)
# include the name of the line
plt.text(xmin+0.03*(xmax-xmin), 0.15, name)
# mark the approximate centroid velocity
plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
# label other lines for clarity
for k in range(0, len(lines)):
vel_off_aas = (lines[k] - line) / line * c
vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s)
plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
# define the data directory
def plot_profile(wave, flux, line, name):
# define the velocity scale [AA / s]
vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c
# convert to [km / s]
vel_kms = vel_aas.to('km/s')
# define parameters for the x and y axes
ax.set_xlim(xmin, xmax)
ax.xaxis.set_minor_locator(minorLocator)
ax.tick_params(axis='x', labelsize=xls)
ax.set_ylim(0., 3.)
# make the plot
ax.plot(vel_kms, np.log(1/flux))
# include the name of the line
plt.text(xmin+0.03*(xmax-xmin), 2.3, name)
# mark the approximate centroid velocity
plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
# label other lines for clarity
for k in range(0, len(lines)):
vel_off_aas = (lines[k] - line) / line * c
vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s)
plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
# define the data directory
def test_circular_integration_1step(self):
"""Compare total number and integrated column density for circle.
"""
from ..core import CircularAperture
Nobs = 6.41756750e26
parent = 1.4e4 * u.km
daughter = 1.7e5 * u.km
Q = 5.8e23 / u.s
v = 1 * u.km / u.s
aper = CircularAperture(3300 * u.km)
coma = Haser(Q, v, parent, daughter)
N1 = coma.total_number(aper)
N2 = coma._integrate_column_density(aper)
assert np.isclose(N1, N2)
def get_wind_speed(self, n=3):
'''
Populates the self.wind_speed property
Based on the information in Rs232_Comms_v120.pdf document
Medians n measurements. This isn't mentioned specifically by the manual
but I'm guessing it won't hurt.
'''
self.logger.info('Getting wind speed')
if self.wind_speed_enabled():
values = []
for i in range(0, n):
result = self.query('V!')
if result:
value = float(result[0])
self.logger.debug(' Wind Speed Query = {:.1f}'.format(value))
values.append(value)
if len(values) >= 3:
self.wind_speed = np.median(values) * u.km / u.hr
self.logger.info(' Wind speed = {:.1f}'.format(self.wind_speed))
else:
self.wind_speed = None
else:
self.wind_speed = None
return self.wind_speed
def pairs(self, sep, dv):
""" Generate a pair catalog
Parameters
----------
sep : Angle or Quantity
dv : Quantity
Offset in velocity. Positive for projected pairs (i.e. dz > input value)
Returns
-------
"""
# Checks
if not isinstance(sep, (Angle, Quantity)):
raise IOError("Input radius must be an Angle type, e.g. 10.*u.arcsec")
if not isinstance(dv, (Quantity)):
raise IOError("Input velocity must be a quantity, e.g. u.km/u.s")
# Match
idx, d2d, d3d = match_coordinates_sky(self.coords, self.coords, nthneighbor=2)
close = d2d < sep
# Cut on redshift
if dv > 0.: # Desire projected pairs
zem1 = self.cat['zem'][close]
zem2 = self.cat['zem'][idx[close]]
dv12 = ltu.dv_from_z(zem1,zem2)
gdz = np.abs(dv12) > dv
# f/g and b/g
izfg = dv12[gdz] < 0*u.km/u.s
ID_fg = self.cat[self.idkey][close][gdz][izfg]
ID_bg = self.cat[self.idkey][idx[close]][gdz][izfg]
else:
pdb.set_trace()
# Reload
return ID_fg, ID_bg
def setup(self):
mjd = np.linspace(55555., 56555., 256)
self.data = RVData(mjd, np.sin(mjd)*u.km/u.s,
stddev=0.1*np.random.uniform(size=mjd.size)*u.km/u.s)
n = 128
samples = dict()
samples['P'] = np.random.uniform(size=n) * u.day
samples['phi0'] = np.random.uniform(size=n) * u.radian
samples['omega'] = np.random.uniform(size=n) * u.radian
samples['ecc'] = np.random.uniform(size=n) * u.one
samples['jitter'] = np.random.uniform(size=n) * u.m/u.s
self.samples = samples
self.n = n
def test_pack_prior_samples(self):
samples = self.samples.copy()
M,units = pack_prior_samples(samples, self.data.rv.unit)
assert units[-1] == u.km/u.s
assert M.shape == (self.n, 5)
samples.pop('jitter')
assert 'jitter' not in samples
M,units = pack_prior_samples(samples, self.data.rv.unit)
assert units[-1] == u.km/u.s
assert M.shape == (self.n, 5)
def test_shit():
joker_params = JokerParams(P_min=8*u.day, P_max=32768*u.day, jitter=0*u.m/u.s)
joker = TheJoker(joker_params)
t = np.random.uniform(0, 250, 16) + 56831.324
t.sort()
rv = np.cos(t)
rv_err = np.random.uniform(0.1, 0.2, t.size)
data = RVData(t=t, rv=rv*u.km/u.s, stddev=rv_err*u.km/u.s)
samples = joker.sample_prior(size=16384)
chunk = []
for k in samples:
chunk.append(np.array(samples[k]))
chunk = np.ascontiguousarray(np.vstack(chunk).T)
t0 = time.time()
cy_ll = batch_marginal_ln_likelihood(chunk, data, joker_params)
print("Cython:", time.time() - t0)
t0 = time.time()
n_chunk = len(chunk)
py_ll = np.zeros(n_chunk)
for i in range(n_chunk):
try:
py_ll[i] = marginal_ln_likelihood(chunk[i], data, joker_params)
except Exception as e:
py_ll[i] = np.nan
print("Python:", time.time() - t0)
assert np.allclose(np.array(cy_ll), py_ll)
def test_plotting():
# check that plotting at least succeeds with allowed arguments
t = np.random.uniform(55555., 56012., size=128)
rv = 100 * np.sin(0.5*t) * u.km/u.s
ivar = 1 / (np.random.normal(0,5,size=t.size)*u.km/u.s)**2
data = RVData(t=t, rv=rv, ivar=ivar)
data.plot()
# style
data.plot(color='r')
# custom axis
fig,ax = plt.subplots(1,1)
data.plot(ax=plt.gca())
# formatting
data.plot(rv_unit=u.m/u.s)
data.plot(rv_unit=u.m/u.s, time_format='jd')
data.plot(rv_unit=u.m/u.s, time_format=lambda x: x.utc.mjd)
# try with no errors
data = RVData(t=t, rv=rv)
data.plot()
data.plot(ecolor='r')
plt.close('all')
def get_max_velocity(p_spt, s_temp):
MS = SpectralTypeRelations.MainSequence()
s_spt = MS.GetSpectralType('temperature', s_temp, prec=1e-3)
R1 = MS.Interpolate('radius', p_spt)
T1 = MS.Interpolate('temperature', p_spt)
M1 = MS.Interpolate('mass', p_spt)
M2 = MS.Interpolate('mass', s_spt)
G = constants.G.cgs.value
Msun = constants.M_sun.cgs.value
Rsun = constants.R_sun.cgs.value
v2 = 2.0 * G * Msun * (M1 + M2) / (Rsun * R1 * (T1 / s_temp) ** 2)
return np.sqrt(v2) * u.cm.to(u.km)
def plot_profile(wave, flux, line, name, fosc):
# define the velocity scale [AA / s]
vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c
# convert to [km / s]
vel_kms = vel_aas.to('km/s')
# define parameters for the x and y axes
ax.set_xlim(xmin, xmax)
ax.xaxis.set_minor_locator(minorLocator)
ax.tick_params(axis='x', labelsize=xls)
ymax = 3.e12
ax.set_ylim(0., ymax)
# make the plot (using equations 5 and 8 from Savage & Sembach 1991)
ax.plot(vel_kms, np.log(1/flux) / 2.654e-15 / (fosc * line))
# include the name of the line
plt.text(xmin+0.03*(xmax-xmin), ymax*0.6, name)
# mark the approximate centroid velocity
plt.axvline(x=vcen[i], ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
plt.axvline(x=vcen[i]+30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
plt.axvline(x=vcen[i]-30., ymin=0., ymax = 1.5, linewidth=0.5, color='k')
# label other lines for clarity
for k in range(0, len(lines)):
vel_off_aas = (lines[k] - line) / line * c
vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s)
plt.axvline(x=vcen[i]+vel_off_kms, ymin=0., ymax = 1.5, linewidth=1, color='k', linestyle='dotted')
# define the data directory
def calculate(wave, flux, line, fosc):
# define the velocity scale [AA / s]
vel_aas = (wave - line*(1+zem[i])) / (line*(1+zem[i])) * c
# convert to [km / s]
vel_kms = vel_aas.to('km/s')
# label other lines for clarity
tau = np.log(1/flux)
for k in range(0, len(lines)):
vel_off_aas = (lines[k] - line) / line * c
vel_off_kms = vel_off_aas.to('km/s') / (u.km / u.s)
return(tau)
def test_dbquantity_to(dbquantity):
expr = dbquantity.to(u.km).value # value * :value_1
value_1 = expr.right.value
assert_almost_equal(value_1, 0.001) # check value_1
def test_dbquantity_add_diff_units(dbquantity):
expr = (dbquantity + 0.1*u.km).value # value + :value_1
value_1 = expr.right.value
assert_almost_equal(value_1, 100)
def test_dbquantity_rsub_diff_untis(dbquantity):
expr = (0.1*u.km - dbquantity).value # :param_1 - :value_1 * value
value_1 = expr.right.left.value
param_1 = expr.left.value
assert_almost_equal(value_1, 0.001)
assert_almost_equal(param_1, 0.1)
def test_dbquantity_greater_diff_units(dbquantity):
expr = dbquantity > 0.1*u.km # value > :value_1
value_1 = expr.right.value
assert_almost_equal(value_1, 100)
def test_rho_as_angle():
# arctan(100 km, 1 au) * 206264.806 = 0.13787950659645942
rho = rho_as_angle(100 * u.km, {'delta': 1 * u.au})
assert np.isclose(rho.to(u.arcsec).value, 0.13787950659645942)
def test_rho_as_length():
# 1 au * tan(1") = 725.2709438078363
rho = rho_as_length(1 * u.arcsec, {'delta': 1 * u.au})
assert np.isclose(rho.to(u.km).value, 725.2709438078363)
def test_as_angle(self):
r = 100 * u.km
aper = CircularAperture(r)
eph = {'delta': 1 * u.au}
assert aper.as_angle(eph).dim == rho_as_angle(r, eph)
def test_as_angle(self):
shape = [100, 200] * u.km
aper = AnnularAperture(shape)
eph = {'delta': 1 * u.au}
assert all(aper.as_angle(eph).dim == rho_as_angle(shape, eph))
def test_as_angle(self):
shape = [100, 200] * u.km
aper = RectangularAperture(shape)
eph = {'delta': 1 * u.au}
assert all(aper.as_angle(eph).dim == rho_as_angle(shape, eph))
def test_column_density_small_aperture(self):
"""Test column density for aperture << lengthscale.
Should be within 1% of ideal value.
"""
Q = 1e28 / u.s
v = 1 * u.km / u.s
rho = 10 * u.km
parent = 1e4 * u.km
N_avg = 2 * Haser(Q, v, parent).column_density(rho)
ideal = Q / v / 2 / rho
assert np.isclose(N_avg.decompose().value, ideal.decompose().value,
rtol=0.01)
def test_total_number_large_aperture(self):
"""Test column density for aperture >> lengthscale."""
Q = 1 / u.s
v = 1 * u.km / u.s
rho = 1000 * u.km
parent = 10 * u.km
N = Haser(Q, v, parent).total_number(rho)
ideal = Q * parent / v
assert np.isclose(N, ideal.decompose().value)
# TEST FAILS
#
# def test_total_number_rho_NJ78(self):
# """Reproduce Newburn and Johnson 1978.
# Species, N observed, Q/v (km**-1)
# CN, 6.4e26, 5.8e23
# C3, 8.3e28, 9.0e23
# C2, 7.8e27, 5.9e24
# Cannot reproduce C3 quoted in paper.
# """
# #Nobs = [6.41756750e26, 8.63191842e+28, 7.81278300e27]
# #Nobs = [6.4e26, 4.2e27, 7.8e27]
# Nobs = [6.4e26, 8.3e28, 7.8e27]
# parent = [1.4e4, 0, 1.0e4] * u.km
# daughter = [1.7e5, 4.6e4, 7.6e4] * u.km
# Q = [5.8e23, 9.0e23, 5.9e24] / u.s
# rho = 3300 * u.km
# N = np.zeros(3)
# for i in range(3):
# coma = Haser(Q[i], 1 * u.km / u.s, parent[i], daughter[i])
# N[i] = coma.total_number(rho)
# assert np.allclose(N, Nobs)
def test_circular_integration_0step(self):
"""Compare total number and integrated column density for circle."""
from ..core import CircularAperture
Nobs = 6.41756750e26
parent = 1.4e4 * u.km
Q = 5.8e23 / u.s
v = 1 * u.km / u.s
aper = CircularAperture(3300 * u.km)
coma = Haser(Q, v, parent)
N1 = coma.total_number(aper)
N2 = coma._integrate_column_density(aper)
assert np.isclose(N1, N2)
def test_total_number_rectangular_ap(self):
"""
compare with:
import astropy.units as u
from sbpy.imageanalysis.utils import rarray
from sbpy.activity import Haser
r = rarray((5000, 3300), subsample=10) * u.km
parent = 1.4e4 * u.km
daughter = 1.7e5 * u.km
Q = 5.8e23 / u.s
v = 1 * u.km / u.s
coma = Haser(Q, v, parent, daughter)
sigma = coma.column_density(r)
print((sigma * 1 * u.km**2).decompose())
--> <Quantity 3.449607967230623e+26>
This differs from the test value below by XXX
"""
from ..core import RectangularAperture
parent = 1.4e4 * u.km
daughter = 1.7e5 * u.km
Q = 5.8e23 / u.s
v = 1 * u.km / u.s
aper = RectangularAperture([5000, 3300] * u.km)
coma = Haser(Q, v, parent, daughter)
N = coma.total_number(aper)
assert np.isclose(N, 3.449607967230623e+26)
def compute():
'''
'''
# Kepler-444 and simulation parameters
Nocc = 15 # number of occultations observed
nout = 4.0 # obseved out of transit durations [tint]
lammin = 1.0 # min wavelength [um]
lammax = 30.0 # max wavelength [um]
Tstar = 5040. # stellar temperature [K]
Rs = 0.752 # stellar radius [Solar Radii]
Rp = 0.4 # planet radius [Earth Radii]
d = 36. # distance to system [pc]
# Additional params for K-444b
r = 0.04178 # semi-major axis [AU]
A = 0. # Planet albedo
e = 1.0 # Planet emissivity
i = 88.0 # Orbital inclination [degrees]
P = 3.6 # Orbital period [days]
# Convert some units to km
Rs_km = Rs * u.solRad.in_units(u.km)
Rp_km = Rp * u.earthRad.in_units(u.km)
r_km = r * u.AU.in_units(u.km)
P_mins = P * 24. * 60.
# Planet temperature [K]
Tplan = Tstar * ((1.-A)/e)**0.25 * (0.5*Rs_km/r_km)**0.5
# transit duration
tdur_mins = (P_mins / np.pi) * np.arcsin(np.sqrt((Rp_km + Rs_km) ** 2
- r_km / (Rs_km * np.cos(i))) / r_km)
# integration time [seconds]
tdur = Nocc * tdur_mins * 60.0
print("Planet Temperature : %.1f K" %Tplan)
print("Transit Duration : %.2f mins" %(tdur_mins))
# Estimate for JWST
fig1, ax1 = jwst.estimate_eclipse_snr(tint = tdur, nout = nout,
lammin = lammin, lammax = lammax,
Tstar = Tstar, Tplan = Tplan,
Rs = Rs, Rp = Rp, d = d)
# Estimate for an OST-like telescope, but using the JWST filters
fig2, ax2 = jwst.estimate_eclipse_snr(tint = tdur, nout = nout,
lammin = lammin, lammax = lammax,
Tstar = Tstar, Tplan = Tplan,
Rs = Rs, Rp = Rp, d = d,
atel = 144., thermal = False)
return fig1, ax1, fig2, ax2