def save_fibmodel(self):
'''
Save the fibers to fits file with two extensions
The first is 3-d for trace, wavelength and fiber_to_fiber
The second is which fibers are good and not dead
'''
try:
self.fibers[0].fibmodel
except:
self.log.warning('Trying to save fibermodel but none exist.')
return None
ylims = np.linspace(-1.*self.fsize, self.fsize, self.fibmodel_nbins)
fibmodel = np.zeros((len(self.fibers), self.fibmodel_nbins, self.D))
for i,fiber in enumerate(self.fibers):
fibmodel[i,:,:] = fiber.fibmodel
s = fits.PrimaryHDU(np.array(fibmodel,dtype='float32'))
t = fits.ImageHDU(np.array(ylims,dtype='float32'))
hdu = fits.HDUList([s,t])
fn = op.join(self.path, 'fibermodel_%s_%s_%s_%s.fits' %(self.specid,
self.ifuslot,
self.ifuid,
self.amp))
self.write_to_fits(hdu, fn)
python类ImageHDU()的实例源码
def writeSVD(self, svdoutputfits='ZAP_SVD.fits'):
"""Write the SVD to an individual fits file."""
check_file_exists(svdoutputfits)
header = fits.Header()
header['ZAPvers'] = (__version__, 'ZAP version')
header['ZAPzlvl'] = (self.run_zlevel, 'ZAP zero level correction')
header['ZAPclean'] = (self.run_clean,
'ZAP NaN cleaning performed for calculation')
header['ZAPcftyp'] = (self._cftype, 'ZAP continuum filter type')
header['ZAPcfwid'] = (self._cfwidth, 'ZAP continuum filter size')
header['ZAPmask'] = (self.maskfile, 'ZAP mask used to remove sources')
nseg = len(self.pranges)
header['ZAPnseg'] = (nseg, 'Number of segments used for ZAP SVD')
hdu = fits.HDUList([fits.PrimaryHDU(self.zlsky)])
for i in range(len(self.pranges)):
hdu.append(fits.ImageHDU(self.especeval[i][0]))
# write for later use
hdu.writeto(svdoutputfits)
logger.info('SVD file saved to %s', svdoutputfits)
def _open(self, cache=False, **kwargs):
if not _fits:
load_lib()
hdulist = _fits.open(self.request.get_file(),
cache=cache, **kwargs)
self._index = []
for n, hdu in zip(range(len(hdulist)), hdulist):
if (isinstance(hdu, _fits.ImageHDU) or
isinstance(hdu, _fits.PrimaryHDU)):
# Ignore (primary) header units with no data (use '.size'
# rather than '.data' to avoid actually loading the image):
if hdu.size > 0:
self._index.append(n)
self._hdulist = hdulist
def to_fits(self):
primary_hdu = fits.PrimaryHDU()
image_hdu = fits.ImageHDU(self.data, self.header)
hdulist = fits.HDUList([primary_hdu, image_hdu])
return hdulist
def create_mock_fits():
x = np.ones((5, 5))
prihdu = fits.PrimaryHDU(x)
# Single extension FITS
img = fits.ImageHDU(data=x)
singlehdu = fits.HDUList([prihdu, img])
singlehdu.writeto('image.fits', clobber=True)
def load_data(self):
hdulist = fits.open(self.filename)
print("MuseCube: Loading the cube fluxes and variances...")
# import pdb; pdb.set_trace()
self.cube = hdulist[1].data
self.stat = hdulist[2].data
# for ivar weighting ; consider creating it in init ; takes long
# self.flux_over_ivar = self.cube / self.stat
self.header_1 = hdulist[1].header # Necesito el header para crear una buena copia del white.
self.header_0 = hdulist[0].header
if self.filename_white is None:
print("MuseCube: No white image given, creating one.")
w_data = self.create_white(save=False)
w_header_0 = copy.deepcopy(self.header_0)
w_header_1 = copy.deepcopy(self.header_1)
# These loops remove the third dimension from the header's keywords. This is neccesary in order to
# create the white image and preserve the cube astrometry
for i in w_header_0.keys():
if '3' in i:
del w_header_0[i]
for i in w_header_1.keys():
if '3' in i:
del w_header_1[i]
# prepare the header
hdu = fits.HDUList()
hdu_0 = fits.PrimaryHDU(header=w_header_0)
hdu_1 = fits.ImageHDU(data=w_data, header=w_header_1)
hdu.append(hdu_0)
hdu.append(hdu_1)
hdu.writeto('new_white.fits', clobber=True)
self.filename_white = 'new_white.fits'
print("MuseCube: `new_white.fits` image saved to disk.")
def spec_to_redmonster_format(spec, fitsname, n_id=None, mag=None):
"""
Function used to create a spectrum in the REDMONSTER software format
:param spec: XSpectrum1D object
:param mag: List containing 2 elements, the first is the keyword in the header that will contain the magnitud saved in the second element
:param fitsname: Name of the fitsfile that will be created
:return:
"""
from scipy import interpolate
wave = spec.wavelength.value
wave_log = np.log10(wave)
n = len(wave)
spec.wavelength = wave_log * u.angstrom
new_wave_log = np.arange(wave_log[1], wave_log[n - 2], 0.0001)
spec_rebined = spec.rebin(new_wv=new_wave_log * u.angstrom)
flux = spec_rebined.flux.value
f = interpolate.interp1d(wave_log, spec.sig.value)
sig = f(new_wave_log)
inv_sig = 1. / np.array(sig) ** 2
inv_sig = np.where(np.isinf(inv_sig), 0, inv_sig)
inv_sig = np.where(np.isnan(inv_sig), 0, inv_sig)
hdu1 = fits.PrimaryHDU([flux])
hdu2 = fits.ImageHDU([inv_sig])
hdu1.header['COEFF0'] = new_wave_log[0]
hdu1.header['COEFF1'] = new_wave_log[1] - new_wave_log[0]
if n_id != None:
hdu1.header['ID'] = n_id
if mag != None:
hdu1.header[mag[0]] = mag[1]
hdulist_new = fits.HDUList([hdu1, hdu2])
hdulist_new.writeto(fitsname, clobber=True)
def save(self, image_list=[], spec_list=[]):
'''
Save the entire amplifier include the list of fibers.
This property is not used often as "amp*.pkl" is large and typically
the fibers can be loaded and the other amplifier properties quickly
recalculated.
'''
self.log.info('Saving images/properties to %s' %self.path)
fn = op.join(self.path, 'multi_%s_%s_%s_%s.fits' %(self.specid,
self.ifuslot,
self.ifuid,
self.amp))
fits_list = []
for i,image in enumerate(image_list):
if i==0:
fits_list.append(fits.PrimaryHDU(np.array(getattr(self, image), dtype='float32')))
else:
fits_list.append(fits.ImageHDU(np.array(getattr(self, image), dtype='float32')))
fits_list[-1].header['EXTNAME'] = image
for i, spec in enumerate(spec_list):
try:
s = np.array([getattr(fiber, spec) for fiber in self.fibers], dtype='float32')
fits_list.append(fits.ImageHDU(s))
fits_list[-1].header['EXTNAME'] = spec
except AttributeError:
self.log.warning('Attribute %s does not exist to save' %spec)
if fits_list:
fits_list[0] = self.write_header(fits_list[0])
hdu = fits.HDUList(fits_list)
self.write_to_fits(hdu, fn)
def subtract_sky_from_sci(sci, sky_list, sky_model_list, wave, use_sci=False):
sci.log.info('Subtracting sky from %s' %sci.basename)
if use_sci:
fn = op.join(sci.path, 'sky_model.txt')
if True:#not op.exists(fn):
F = fits.open('panacea/leoI_20131209.fits')
G = get_fits(sci)
get_master_sky(G['wavelength'].data,
G['spectrum'].data - F[0].data*sci.exptime,
G['fiber_to_fiber'].data, sci.path, sci.exptime)
wave, sky_model = np.loadtxt(fn, unpack=True)
else:
weight, sorted_ind = get_interp_weights(sci, sky_list)
arr_list = []
for i,w in enumerate(weight):
if w > 0.0:
arr_list.append(w * sky_model_list[sorted_ind[i]])
sky_model = np.sum(arr_list, axis=(0,))
SCI = get_fits(sci)
sky_subtracted = np.zeros(SCI['spectrum'].data.shape)
for i, spec in enumerate(SCI['spectrum'].data):
sky_subtracted[i,:] = (spec - np.interp(SCI['wavelength'].data[i,:],
wave, sky_model*sci.exptime)
* SCI['fiber_to_fiber'].data[i,:])
s = fits.ImageHDU(sky_subtracted)
erase = []
for i,S in enumerate(SCI):
if S.header['EXTNAME'] == 'sky_subtracted':
erase.append(i)
for i in sorted(erase,reverse=True):
del SCI[i]
SCI.append(s)
SCI[-1].header['EXTNAME'] = 'sky_subtracted'
write_fits(SCI)
return SCI['wavelength'].data, sky_subtracted, wave, sky_model
def load_data(self):
hdulist = fits.open(self.filename)
print("MuseCube: Loading the cube fluxes and variances...")
# import pdb; pdb.set_trace()
self.cube = ma.MaskedArray(hdulist[1].data)
self.stat = ma.MaskedArray(hdulist[2].data)
print("MuseCube: Defining master masks (this may take a while but it is for the greater good).")
# masking
self.mask_init = np.isnan(self.cube) | np.isnan(self.stat)
self.cube.mask = self.mask_init
self.stat.mask = self.mask_init
# for ivar weighting ; consider creating it in init ; takes long
# self.flux_over_ivar = self.cube / self.stat
self.header_1 = hdulist[1].header # Necesito el header para crear una buena copia del white.
self.header_0 = hdulist[0].header
if self.filename_white is None:
print("MuseCube: No white image given, creating one.")
w_data = copy.deepcopy(self.create_white(save=False).data)
w_header_0 = copy.deepcopy(self.header_0)
w_header_1 = copy.deepcopy(self.header_1)
# These loops remove the third dimension from the header's keywords. This is neccesary in order to
# create the white image and preserve the cube astrometry
for i in w_header_0.keys():
if '3' in i:
del w_header_0[i]
for i in w_header_1.keys():
if '3' in i:
del w_header_1[i]
# prepare the header
hdu = fits.HDUList()
hdu_0 = fits.PrimaryHDU(header=w_header_0)
hdu_1 = fits.ImageHDU(data=w_data, header=w_header_1)
hdu.append(hdu_0)
hdu.append(hdu_1)
hdu.writeto('new_white.fits', clobber=True)
self.filename_white = 'new_white.fits'
print("MuseCube: `new_white.fits` image saved to disk.")