def calc_beam_size(header):
"""
Calculate the beam/PSF size using the 'WSCNORMF' keyword recorded by
the WSClean imager, which applies to the natural-weighted image, and
is more accurate than the fitted beam (stored as 'BMAJ' and 'BMIN'.)
"""
try:
weight = header["WSCWEIGH"]
normf = header["WSCNORMF"]
print("WSCWEIGH: %s" % weight)
print("WSCNORMF: %.1f" % normf)
except KeyError:
raise RuntimeError("NO necessary WSC* keyword; switch to " +
"--use-fitted-beam instead")
if weight.upper() != "NATURAL":
print("WARNING: weighting scheme is '%s' != natural!" % weight)
width = header["NAXIS1"]
height = header["NAXIS2"]
pixelsize = np.abs(header["CDELT1"]) * 3600 # [arcsec]
print("Image: %dx%d, %.1f [arcsec/pixel]" % (width, height, pixelsize))
beam_size = (width*height * pixelsize**2) / normf
return beam_size
python类arcsec()的实例源码
def __init__(self, image, freq, pixelsize, ra0, dec0,
minvalue=1e-4, maxvalue=np.inf, mask=None,
projection="CAR"):
self.image = image # [K] (brightness temperature)
self.freq = freq # [MHz]
self.pixelsize = pixelsize # [arcsec]
self.ra0 = ra0 # [deg]
self.dec0 = dec0 # [deg]
self.minvalue = minvalue
self.maxvalue = maxvalue
self.mask = mask
self.projection = projection
logger.info("SkyModel: Loaded image @ %.2f [MHz], " % freq +
"%.1f [arcsec/pixel]" % pixelsize)
logger.info("Image size: %dx%d" % self.shape)
logger.info("FoV size: %.2fx%.2f [deg^2]" % self.fov)
def calc_rate(redshift):
"""Calculate the rate, kpc/px."""
# Init
# Hubble constant at z = 0
H0 = 71.0
# Omega0, total matter density
Om0 = 0.27
# Cosmo
cosmo = FlatLambdaCDM(H0=H0, Om0=Om0)
# Angular diameter distance, [Mpc]
dA = cosmo.angular_diameter_distance(redshift)
# Resolution of Chandra
rate_px = 0.492 * au.arcsec # [arcmin/px]
rate_px_rad = rate_px.to(au.rad)
# rate
rate = dA.value * rate_px_rad.value # [Mpc/px]
return rate
def parser(options=None):
import argparse
parser = argparse.ArgumentParser(description='specdb_meta script v0.1')
parser.add_argument("coord", type=str, help="Coordinates, e.g. J081240.7+320809 or 122.223,-23.2322 or 07:45:00.47,34:17:31.1")
parser.add_argument("dbase", type=str, help="Database [igmspec,uvqs,all,priv]")
parser.add_argument("--tol", default=5., type=float, help="Maximum offset in arcsec [default=5.]")
parser.add_argument("-g", "--group", help="Restrict on Group (e.g. BOSS_DR12)")
parser.add_argument("--db_file", help="Full path of db_file")
if options is None:
args = parser.parse_args()
else:
args = parser.parse_args(options)
return args
def parser(options=None):
import argparse
parser = argparse.ArgumentParser(description='specdb_plot script v0.3')
parser.add_argument("coord", type=str, help="Coordinates, e.g. J081240.7+320809 or 122.223,-23.2322 or 07:45:00.47,34:17:31.1")
parser.add_argument("dbase", type=str, help="Database [igmspec,uvqs,all,priv]")
parser.add_argument("--tol", default=5., type=float, help="Maximum offset in arcsec [default=5.]")
#parser.add_argument("--meta", default=True, help="Show meta data? [default: True]", action="store_true")
parser.add_argument("-g", "--group", help="Restrict on Group (e.g. BOSS_DR12)")
parser.add_argument("-s", "--select", default=0, type=int, help="Index of spectrum to plot (when multiple exist)")
parser.add_argument("--mplot", default=False, help="Use simple matplotlib plot [default: False]")
parser.add_argument("--db_file", help="Full path of db_file")
if options is None:
args = parser.parse_args()
else:
args = parser.parse_args(options)
return args
def test_meta_from_position(igmsp):
# One match
meta = igmsp.meta_from_position((0.0019,17.7737), 1*u.arcsec)
assert len(meta) == 1
# Blank
meta2 = igmsp.meta_from_position((10.038604,55.298477), 1*u.arcsec)
assert meta2 is None
# Multiple sources (insure rank order)
meta3 = igmsp.meta_from_position((0.0055,-1.5), 1*u.deg)
assert len(meta3) == 2
assert np.isclose(meta3['R'][0],meta3['R'][1])
# Multiple meta entries (GGG)
meta4 = igmsp.meta_from_position('001115.23+144601.8', 1*u.arcsec)
assert len(meta4) == 2
assert meta4['R'][0] != meta4['R'][1]
# Multiple but grab closest source
meta5 = igmsp.meta_from_position((0.0055,-1.5), 1*u.deg, max_match=1)
assert len(meta5) == 1
# Groups
meta = igmsp.meta_from_position((2.813500,14.767200), 20*u.deg, groups=['GGG','HD-LLS_DR1'])
for group in meta['GROUP'].data:
assert group in ['GGG', 'HD-LLS_DR1']
# Physical separation
meta6 = igmsp.meta_from_position('001115.23+144601.8', 300*u.kpc)
assert len(meta6) == 2
def test_query_position(igmsp):
# One match
_, _, idx = igmsp.qcat.query_position((0.0019,17.7737), 1*u.arcsec)
assert idx >= 0
# Blank
_, _, idx = igmsp.qcat.query_position((10.038604,55.298477), 1*u.arcsec)
assert len(idx) == 0
# Multiple (insure rank order)
icoord = SkyCoord(ra=0.0055, dec=-1.5, unit='deg')
_, subcat, _ = igmsp.qcat.query_position(icoord, 1*u.deg)
# Test
coord = SkyCoord(ra=subcat['RA'], dec=subcat['DEC'], unit='deg')
sep = icoord.separation(coord)
isrt = np.argsort(sep)
assert isrt[0] == 0
assert isrt[-1] == len(subcat)-1
# Multiple but grab only 1
_, _, idxs = igmsp.qcat.query_position(icoord, 1*u.deg, max_match=1)
assert len(idxs) == 1
def test_spectra_from_meta(igmsp): # Base level operation
# One match
meta = igmsp.meta_from_position((0.0019,17.7737), 1*u.arcsec)
spec = igmsp.spectra_from_meta(meta)
# Two sources, one meta entry each
meta2 = igmsp.meta_from_position((0.0055,-1.5), 1*u.deg)
spec2 = igmsp.spectra_from_meta(meta2)
assert spec2.nspec == 2
# Many sources and meta entries; groups separated
meta3 = igmsp.meta_from_position((2.813500,14.767200), 20*u.deg)#, groups=['GGG','HD-LLS_DR1'])
spec3 = igmsp.spectra_from_meta(meta3)
assert spec3.nspec == 15
# Many sources and meta entries; groups scrambled
idx = np.arange(15).astype(int)
idx[1] = 13
idx[13] = 1
meta4 = meta3[idx]
spec4 = igmsp.spectra_from_meta(meta4)#, debug=True)
spec4.select = 1
assert np.isclose(meta4['WV_MIN'][1], spec4.wvmin.value)
def match_to_database(source_list, image_list, cat, Ast, imscale, thresh=8.):
'''
RA, Decdetection catalogs and vizier (astroquery) match? vo conesearch?
'''
matches = []
for sources, image in zip(source_list, image_list):
ifuslot = sources[1]
Ast.get_ifuslot_projection(ifuslot, imscale, image.shape[1]/2.,
image.shape[0]/2.)
for source in sources[0]:
ra, dec = Ast.tp_ifuslot.wcs.wcs_pix2world(source['xcentroid'],
source['ycentroid'],1)
c = SkyCoord(ra, dec, unit=(unit.degree, unit.degree))
idx, d2d, d3d = match_coordinates_sky(c, cat, nthneighbor=1)
if d2d.arcsec[0] < thresh:
dra = cat.ra[idx].deg - float(ra)
ddec = cat.dec[idx].deg - float(dec)
matches.append([int(ifuslot),int(idx),
float(ra), float(dec),
cat.ra[idx].deg,
cat.dec[idx].deg, dra, ddec,
d2d.arcsec[0]])
return matches
def peek(self, figsize=(10, 10), color='b', alpha=0.75,
print_to_file=None, **kwargs):
"""
Show extracted fieldlines overlaid on HMI image.
"""
fig = plt.figure(figsize=figsize)
ax = fig.gca(projection=self.hmi_map)
self.hmi_map.plot()
ax.set_autoscale_on(False)
for stream, _ in self.streamlines:
ax.plot(self._convert_angle_to_length(stream[:, 0]*u.cm,
working_units=u.arcsec).to(u.deg),
self._convert_angle_to_length(stream[:, 1]*u.cm,
working_units=u.arcsec).to(u.deg),
alpha=alpha, color=color, transform=ax.get_transform('world'))
if print_to_file is not None:
plt.savefig(print_to_file, **kwargs)
plt.show()
def convert_angle_to_length(hmi_map, angle_or_length, working_units=u.meter):
"""
Helper for easily converting between angle and length units. If converting to length, returned
units will be `~astropy.units.cm`. If converting to angle, the returned units will be
`~astropy.units.arcsec`.
"""
observed_distance = (hmi_map.dsun - hmi_map.rsun_meters)
radian_length = [(u.radian, u.meter, lambda x: observed_distance*x, lambda x: x/observed_distance)]
converted = solarbextrapolation.utilities.decompose_ang_len(angle_or_length,
working_units=working_units,
equivalencies=radian_length)
if working_units == u.meter:
return converted.to(u.cm)
else:
return converted.to(u.arcsec)
def create_simpleDS9region(outputfile,ralist,declist,color='red',circlesize=0.5,textlist=None,clobber=False):
"""
Generate a basic DS9 region file with circles around a list of coordinates
--- INPUT ---
outputfile Path and name of file to store reigion file to
ralist List of R.A. to position circles at
declist List of Dec. to position circles at
color Color of circles
size Size of circles (radius in arcsec)
text Text string for each circle
clobber Overwrite existing files?
"""
if not clobber:
if os.path.isfile(outputfile):
sys.exit('File already exists and clobber = False --> ABORTING')
fout = open(outputfile,'w')
fout.write("# Region file format: DS9 version 4.1 \nfk5\n")
for rr, ra in enumerate(ralist):
string = 'circle('+str(ra)+','+str(declist[rr])+','+str(circlesize)+'") # color='+color+' width=3 '
if textlist is not None:
string = string+' font="times 10 bold roman" text={'+textlist[rr]+'}'
fout.write(string+' \n')
fout.close()
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def wcs(self):
"""
WCS for the given image slice.
"""
shape = self.image.shape
delta = self.pixelsize / 3600.0 # [arcsec] -> [deg]
wcs_ = WCS(naxis=2)
wcs_.wcs.ctype = ["RA---"+self.projection, "DEC--"+self.projection]
wcs_.wcs.crval = np.array([self.ra0, self.dec0])
wcs_.wcs.crpix = np.array([shape[1], shape[0]]) / 2.0 + 1
wcs_.wcs.cdelt = np.array([delta, delta])
return wcs_
def fits_header(self):
header = self.wcs.to_header()
header["BUNIT"] = ("Jy/pixel", "Brightness unit")
header["FREQ"] = (self.freq, "Frequency [MHz]")
header["RA0"] = (self.ra0, "Center R.A. [deg]")
header["DEC0"] = (self.dec0, "Center Dec. [deg]")
header["PixSize"] = (self.pixelsize, "Pixel size [arcsec]")
header["K2JyPix"] = (self.factor_K2JyPixel, "[K] -> [Jy/pixel]")
header["MINVALUE"] = (self.minvalue, "[K] minimum threshold")
if np.isfinite(self.maxvalue):
header["MAXVALUE"] = (self.maxvalue, "[K] maximum threshold")
return header
def factor_K2JyPixel(self):
"""
Conversion factor from [K] to [Jy/pixel]
"""
pixarea = (self.pixelsize * au.arcsec) ** 2
freq = self.freq * au.MHz
equiv = au.brightness_temperature(pixarea, freq)
factor = au.K.to(au.Jy, equivalencies=equiv)
return factor
def main(args, unit_test=False, **kwargs):
""" Run
"""
import numpy as np
from astropy import units as u
from specdb.utils import load_db
from specdb import group_utils
from linetools.scripts.utils import coord_arg_to_coord
# init
Specdb = load_db(args.dbase, db_file=args.db_file, **kwargs)
# Grab
icoord = coord_arg_to_coord(args.coord)
if args.group is not None:
groups=[args.group]
else:
groups = None
meta = Specdb.meta_from_position(icoord, args.tol*u.arcsec, groups=groups)
if unit_test:
return meta
# Outcome
if meta is None:
print("No source found, try another location or a larger tolerance.")
return
else:
group_utils.show_group_meta(meta, idkey=Specdb.idkey, show_all_keys=False)
def cat_from_coords(self, coords, toler=0.5*u.arcsec, **kwargs):
""" Return a cut-out of the catalog matched to input coordinates
within a tolerance. Ordered by the input coordinate list.
Entries without a match are Null with ID<0.
Parameters
----------
coords : SkyCoord
Single or array
toler : Angle, optional
verbose : bool, optional
Returns
-------
matched_cat : Table
"""
# Generate the dummy table
if len(coords.shape) == 0:
ncoord = 1
else:
ncoord = coords.shape[0]
matched_cat = Table(np.repeat(np.zeros_like(self.cat[0]), ncoord))
# Grab IDs
IDs = self.match_coord(coords, toler=toler, **kwargs)
# Find rows in catalog
rows = match_ids(IDs, self.cat[self.idkey], require_in_match=False)
# Fill
gd_rows = rows >= 0
matched_cat[np.where(gd_rows)] = self.cat[rows[gd_rows]]
# Null the rest
matched_cat[self.idkey][np.where(~gd_rows)] = IDs[~gd_rows]
# Return
return matched_cat
def pairs(self, sep, dv):
""" Generate a pair catalog
Parameters
----------
sep : Angle or Quantity
dv : Quantity
Offset in velocity. Positive for projected pairs (i.e. dz > input value)
Returns
-------
"""
# Checks
if not isinstance(sep, (Angle, Quantity)):
raise IOError("Input radius must be an Angle type, e.g. 10.*u.arcsec")
if not isinstance(dv, (Quantity)):
raise IOError("Input velocity must be a quantity, e.g. u.km/u.s")
# Match
idx, d2d, d3d = match_coordinates_sky(self.coords, self.coords, nthneighbor=2)
close = d2d < sep
# Cut on redshift
if dv > 0.: # Desire projected pairs
zem1 = self.cat['zem'][close]
zem2 = self.cat['zem'][idx[close]]
dv12 = ltu.dv_from_z(zem1,zem2)
gdz = np.abs(dv12) > dv
# f/g and b/g
izfg = dv12[gdz] < 0*u.km/u.s
ID_fg = self.cat[self.idkey][close][gdz][izfg]
ID_bg = self.cat[self.idkey][idx[close]][gdz][izfg]
else:
pdb.set_trace()
# Reload
return ID_fg, ID_bg
def spectra_from_coord(self, inp, tol=0.5*u.arcsec, **kwargs):
""" Return spectra and meta data from an input coordinate
Radial search at that location within a small tolerance
Returns closet source if multiple are found
Warning: Only returns meta entires that have corresponding spectra
Parameters
----------
inp : str, tuple, SkyCoord
See linetools.utils.radec_to_coord
Only one coord may be input
tol : Angle or Quantity, optional
Search radius
kwargs :
fed to grab_specmeta
Returns
-------
spec : XSpectrum1D
All spectra corresponding to the closest source within tolerance
meta : Table
Meta data describing to spec
"""
# Grab meta
meta = self.meta_from_position(inp, tol, max_match=1, **kwargs)
if meta is None:
return None, None
# Grab spec and return
return self.spectra_from_meta(meta, subset=True)
def get_cat_dict():
""" Definitions for the catalog
Returns
-------
"""
cdict = dict(match_toler=2*u.arcsec)
return cdict
def slit_width(slitname, req_long=True):
""" Slit width
Parameters
----------
slitname : str
req_long : bool, optional
Require long in slit name. If not present return 1.0
Returns
-------
sdict : dict
Translates slit mask name to slit with in arcsec or pixels
"""
sdict = {'long_1.0': 1.0,
'long_1.5': 1.5,
'1.0x180': 1.0, # MMT
'LS5x60x0.6': 0.6, # MODS
'0.30 arcsec': 0.3, # GNIRS
'f6-4pix_G5212': 4., # NIRI
'42x0.570': 0.57, # NIRSPEC
'LONGSLIT-46x0.7': 0.7, # MOSFIRE
'slit 1.5 arcsec': 1.5, # RCS (kp4m)
}
#
try:
swidth = sdict[slitname]
except KeyError:
try:
swidth = float(slitname)
except ValueError:
if ('long' not in slitname) & req_long:
swidth = 1.
else:
pdb.set_trace()
#
return swidth
def zem_from_radec(ra, dec, catalog, toler=2*u.arcsec, debug=False):
""" Parse input catalog (e.g. Myers) for zem
Parameters
----------
ra : list or array
RA in deg
dec : list or array
DEC in deg
catalog : Table
Must contain RA,DEC,ZEM,ZEM_SOURCE
debug : bool, optional
Returns
-------
zem : array
Redshifts
zsource : array
str array of sources
"""
# Generate coordinates
icoord = SkyCoord(ra=ra, dec=dec, unit='deg')
# Quasar catalog
qcoord = SkyCoord(ra=catalog['RA'], dec=catalog['DEC'], unit='deg')
# Match
idx, d2d, d3d = match_coordinates_sky(icoord, qcoord, nthneighbor=1)
good = d2d < toler
if debug:
pdb.set_trace()
# Finish
zem = np.zeros(len(ra))
try:
zem[good] = catalog['ZEM'][idx[good]]
except IndexError:
pdb.set_trace()
zsource = np.array([str('NONENONE')]*len(ra))
zsource[good] = catalog['ZEM_SOURCE'][idx[good]]
# Return
return zem, zsource
def kpc_per_arcsec(self, z):
"""
Calculate the transversal length (unit: kpc) corresponding to
1 arcsec at the *angular diameter distance* of z.
"""
dist_kpc = self.angular_diameter_distance(z, unit="kpc")
return dist_kpc * au.arcsec.to(au.rad)
def kpc_per_pix(self, z):
"""
Calculate the transversal length (unit: kpc) corresponding to
1 ACIS pixel (i.e., 0.492 arcsec) at the *angular diameter distance*
of z.
"""
pix = ACIS.pixel2arcsec * au.arcsec
dist_kpc = self.angular_diameter_distance(z, unit="kpc")
return dist_kpc * pix.to(au.rad).value
def cm_per_pix(self, z):
"""
Calculate the transversal length (unit: cm) corresponding to
1 ACIS pixel (i.e., 0.492 arcsec) at the *angular diameter distance*
of z.
"""
return self.kpc_per_pix(z) * au.kpc.to(au.cm)
def __init__(self, name, z, x, y):
self.name = name
self.z = z
self.x = x
self.y = y
self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc
self.radToKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc
# setting up final xl file
def __init__(self, name, z, x, y):
self.name = name
self.z = z
self.x = x
self.y = y
self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc
self.radToKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc
# We grab our galaxy data and make a list of galaxy objects
def __init__(self, name, z, x, y):
self.name = name
self.z = z
self.x = x
self.y = y
self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc
self.radToKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc
# Grabbing and filling galaxy data
def __init__(self, name, z, x, y):
self.name = name
self.z = z
self.x = x
self.y = y
self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc
self.radToKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc
# We grab our galaxy data and make a list of galaxy objects
def __init__(self, name, z, x, y):
self.name = name
self.z = z
self.x = x
self.y = y
self.lDcm = cosmo.luminosity_distance(self.z)*u.Mpc.to(u.cm) / u.Mpc
self.radToKpc = conv.arcsec_per_kpc_proper(self.z)*0.05/u.arcsec*u.kpc
# setting up final xl file