def simple_generic_writer(data, file_name, **kwargs):
"""
Basic `Spectrum1DRef` FITS writer.
"""
# Create fits columns
flux_col = fits.Column(name='FLUX', format='E', array=data.data)
disp_col = fits.Column(name='WAVE', format='E', array=data.dispersion)
uncert_col = fits.Column(name='UNCERT', format='E', array=data.uncertainty.array)
mask_col = fits.Column(name='MASK', format='L', array=data.mask)
cols = fits.ColDefs([flux_col, disp_col, uncert_col, mask_col])
# Create the bin table
tbhdu = fits.BinTableHDU.from_columns(cols)
# Create header
prihdu = fits.PrimaryHDU(header=data.meta.get('header', data.wcs.to_header()))
# Compose
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist.writeto("{}.fits".format(file_name), overwrite=True)
python类PrimaryHDU()的实例源码
def write_slice(self, outfile, data, z, clobber=False):
freq = z2freq(z)
Dc = cosmo.comoving_distance(z).value # [Mpc]
header = fits.Header()
header["BUNIT"] = (self.header["BUNIT"],
self.header.comments["BUNIT"])
header["Lside"] = (self.header["Lside"],
self.header.comments["Lside"])
header["Nside"] = (self.header["Nside"],
self.header.comments["Nside"])
header["REDSHIFT"] = (z, "redshift of this slice")
header["FREQ"] = (freq, "[MHz] observed HI signal frequency")
header["Dc"] = (Dc, "[cMpc] comoving distance")
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
hdu = fits.PrimaryHDU(data=data, header=header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote slice to file: %s" % outfile)
def write_fits(self, outfile, oldheader=None, clobber=False):
if os.path.exists(outfile) and (not clobber):
raise OSError("Sky FITS already exists: %s" % outfile)
if oldheader is not None:
header = oldheader
header.extend(self.fits_header, update=True)
else:
header = self.fits_header
header.add_history(datetime.now().isoformat())
header.add_history(" ".join(sys.argv))
image = self.image
image[~self.mask] = np.nan
image *= self.factor_K2JyPixel
hdu = fits.PrimaryHDU(data=image, header=header)
try:
hdu.writeto(outfile, overwrite=True)
except TypeError:
hdu.writeto(outfile, clobber=True) # old astropy versions
logger.info("Wrote FITS image of sky model to file: %s" % outfile)
def writeToFits(self, dataArrs, outPrefix = None, clobber = True, output_verify = "exception"):
""" Save dict of ndarray to fits file
dataArrs: {index: dataArr} returned by `loadSpeImg`
"""
if outPrefix is None:
matched = re.match('(.*)\.spe.*$', self._filename, flags = re.IGNORECASE)
if matched is not None and matched.groups()[0] != '':
outPrefix = matched.groups()[0]
else:
outPrefix = self._filename
for index, dataArr in dataArrs.items():
name = "{}_x{:03}.fits".format(outPrefix, index)
hdu = fits.PrimaryHDU(data = dataArr,
header = self._fitshdr,
)
hdu.writeto(name, clobber = clobber, output_verify = output_verify)
def combineFits(filenames,outFname):
"""
combine the data in multiple ExoSOFT fits files together.
Used primarily for after multi-process runs.
"""
nFiles = len(filenames)
(head0,dataALL) = loadFits(filenames[0])
for filename in filenames[1:]:
(head,data) = loadFits(filename)
dataALL = np.concatenate((dataALL,data))
hdu = pyfits.PrimaryHDU(dataALL)
hdulist = pyfits.HDUList([hdu])
header = hdulist[0].header
for key in head0:
if key=='NSAMPLES':
##load in total number of samples for this combined file
header['NSAMPLES'] = (int(head0['NSAMPLES'])*nFiles,head0.comments['NSAMPLES'])
else:
header[key] = (head0[key],head0.comments[key])
hdulist.writeto(outFname)
hdulist.close()
log.info("output file written to:below\n"+outFname)
def renameFits(filenameIn,filenameOut,killInput=True,overwrite=True):
"""
Load input into a new fits HDU, write to output name, close, rm input file.
"""
goodToGo = True
if os.path.exists(filenameOut):
if overwrite:
rmFiles([filenameOut])
else:
goodToGo=False
log.critical("File already exists, either set overwrite==True, or there be more careful.")
if goodToGo:
(head,data) = loadFits(filenameIn)
hdu = pyfits.PrimaryHDU(data)
hdulist = pyfits.HDUList([hdu])
header = hdulist[0].header
for key in head:
header[key] = (head[key],head.comments[key])
hdulist.writeto(filenameOut)
hdulist.close()
log.info("output file written to:below\n"+filenameOut)
if killInput:
rmFiles([filenameIn])
def predict(self):
print("Predicting validation data...")
input_validation = np.zeros((1,self.ny,self.nx,1), dtype='float32')
input_validation[0,:,:,0] = self.image
start = time.time()
out = self.model.predict(input_validation)
end = time.time()
print("Prediction took {0:3.2} seconds...".format(end-start))
print("Saving data...")
hdu = fits.PrimaryHDU(out[0,:,:,0])
import os.path
if os.path.exists(self.output):
os.system('rm {0}'.format(self.output))
print('Overwriting...')
hdu.writeto('{0}'.format(self.output))
# import matplotlib.pyplot as plt
# plt.imshow(out[0,:,:,0])
# plt.savefig('hmi.pdf')
def write_frame(data, header, path):
"""
This function ...
:param data:
:param header:
:return:
"""
# Create the HDU
hdu = fits.PrimaryHDU(data, header)
# Write the HDU to a FITS file
hdu.writeto(path, clobber=True)
# -----------------------------------------------------------------
def write_datacube(datacube, header, path):
"""
This function ...
:param datacube:
:param header:
:param path:
:return:
"""
# Create the HDU from the data array and the header
hdu = fits.PrimaryHDU(np.array(datacube), header)
# Write the HDU to a FITS file
hdu.writeto(path, clobber=True)
# Inform the user that the file has been created
log.debug("File " + path + " created")
# -----------------------------------------------------------------
def write_frame(data, header, path):
"""
This function ...
:param data:
:param header:
:return:
"""
# Create the HDU
hdu = fits.PrimaryHDU(data, header)
# Write the HDU to a FITS file
hdu.writeto(path, clobber=True)
# -----------------------------------------------------------------
def write_datacube(datacube, header, path):
"""
This function ...
:param datacube:
:param header:
:param path:
:return:
"""
# Create the HDU from the data array and the header
hdu = fits.PrimaryHDU(np.array(datacube), header)
# Write the HDU to a FITS file
hdu.writeto(path, clobber=True)
# Inform the user that the file has been created
log.debug("File " + path + " created")
# -----------------------------------------------------------------
cumulative_distributions.py 文件源码
项目:DR1_analysis
作者: GBTAmmoniaSurvey
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def regrid_h2(projDir='/media/DATAPART/projects/GAS/testing/',
region_list = ['L1688','NGC1333','B18','OrionA'],
file_extension='DR1_rebase3',
herDir = '/media/DATAPART/projects/GAS/otherData/herschel_ayushi/',
herFile_list=['OphL1688','perseus','Tau_B18','orionA-N']):
for region_i in range(len(region_list)):
region = region_list[region_i]
herFilename = herFile_list[region_i]
herColFits = herDir+'{0}/Colden_temp/{1}_colden_masked.fits'.format(region,herFilename)
nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension)
h2_hdu = fits.open(herColFits)
nh3_hdr = fits.getheader(nh3ImFits)
new_h2 = FITS_tools.hcongrid.hcongrid(h2_hdu[0].data,h2_hdu[0].header,nh3_hdr)
new_h2_hdu = fits.PrimaryHDU(new_h2,nh3_hdr)
new_h2_hduList = fits.HDUList([new_h2_hdu])
new_h2_hduList.writeto('nh2_regridded/{0}_NH2_regrid.fits',clobber=True)
def reg_mask_ota(self):
print self.reg_file
box_x,box_y,box_dx,box_dy = self.parse_box_reg()
for i,reg in enumerate(box_x):
ota_reg_mask = self.mask_reg(self.box_x[i],
self.box_y[i],
self.box_dx[i],
self.box_dy[i])
hdu = fits.PrimaryHDU(ota_reg_mask.astype(float))
mask_name = self.mask_name
hdu.writeto(mask_name,clobber=True)
iraf.unlearn(iraf.imutil.imcopy)
iraf.imutil.imcopy.setParam('input',mask_name)
iraf.imutil.imcopy.setParam('output',mask_name.replace('fits','pl'))
iraf.imutil.imcopy.setParam('verbose','no')
iraf.imutil.imcopy(mode='h')
return ota_reg_mask
def write_fiberextract(self, side, ext, prefix):
if not self.check_side(side):
return None
outname = self.build_outname(side, prefix)
self.log.info('Making fiberextract image for %s' %op.basename(outname))
self.build_FE(side, ext)
hdu = fits.PrimaryHDU(np.array(self.fiberextract[side],
dtype='float32'), header=self.header)
hdu.header.remove('BIASSEC')
hdu.header.remove('TRIMSEC')
hdu.header.remove('AMPSEC')
hdu.header.remove('DETSEC')
hdu.header.remove('CCDSEC')
hdu.header['CRVAL1'] = self.wavelim[0]
hdu.header['CDELT1'] = self.disp
hdu.header['CD1_1'] = self.disp
hdu.header['CRPIX1'] = 1
hdu.header['CCDPOS']=side
hdu.header['DATASEC']='[%i:%i,%i:%i]'%(1,self.fiberextract[side].shape[0],
1,self.fiberextract[side].shape[1])
hdu.header.remove('CCDHALF')
hdu.header.remove('AMPLIFIE')
self.write_to_fits(hdu, outname)
def write_to_fits(hdu, outname):
"""Write fits out
Parameters
----------
hdu : PrimaryHDU Object
fits obect to write out
outname : str
file name to write out
"""
try:
hdu.writeto(outname, overwrite=True)
except TypeError:
hdu.writeto(outname, clobber=True)
def save_fibmodel(self):
'''
Save the fibers to fits file with two extensions
The first is 3-d for trace, wavelength and fiber_to_fiber
The second is which fibers are good and not dead
'''
try:
self.fibers[0].fibmodel
except:
self.log.warning('Trying to save fibermodel but none exist.')
return None
ylims = np.linspace(-1.*self.fsize, self.fsize, self.fibmodel_nbins)
fibmodel = np.zeros((len(self.fibers), self.fibmodel_nbins, self.D))
for i,fiber in enumerate(self.fibers):
fibmodel[i,:,:] = fiber.fibmodel
s = fits.PrimaryHDU(np.array(fibmodel,dtype='float32'))
t = fits.ImageHDU(np.array(ylims,dtype='float32'))
hdu = fits.HDUList([s,t])
fn = op.join(self.path, 'fibermodel_%s_%s_%s_%s.fits' %(self.specid,
self.ifuslot,
self.ifuid,
self.amp))
self.write_to_fits(hdu, fn)
def make_error_frame(image1, image2, mask1, mask2, header, outname):
print("Making error image for %s" %op.basename(outname))
a,b = image1.shape
new = np.zeros((a*2,b))
mas = np.zeros((a*2,b))
new[:a,:] = image1
new[a:,:] = image2
mas[:a,:] = mask1
mas[a:,:] = mask2
err = np.zeros(new.shape)
for i in xrange(2*a):
err[i,:] = np.where(mas[i,:]<0, mas[i,:],
biweight_filter(new[i,:], 31,
func=biweight_midvariance))
hdu = fits.PrimaryHDU(np.array(err, dtype='float32'), header=header)
hdu.header.remove('BIASSEC')
hdu.header.remove('TRIMSEC')
hdu.header['DATASEC'] = '[%i:%i,%i:%i]' %(1,b,1,2*a)
outname = op.join(op.dirname(outname), 'e.' + op.basename(outname))
hdu.writeto(outname, overwrite=True)
def writeSVD(self, svdoutputfits='ZAP_SVD.fits'):
"""Write the SVD to an individual fits file."""
check_file_exists(svdoutputfits)
header = fits.Header()
header['ZAPvers'] = (__version__, 'ZAP version')
header['ZAPzlvl'] = (self.run_zlevel, 'ZAP zero level correction')
header['ZAPclean'] = (self.run_clean,
'ZAP NaN cleaning performed for calculation')
header['ZAPcftyp'] = (self._cftype, 'ZAP continuum filter type')
header['ZAPcfwid'] = (self._cfwidth, 'ZAP continuum filter size')
header['ZAPmask'] = (self.maskfile, 'ZAP mask used to remove sources')
nseg = len(self.pranges)
header['ZAPnseg'] = (nseg, 'Number of segments used for ZAP SVD')
hdu = fits.HDUList([fits.PrimaryHDU(self.zlsky)])
for i in range(len(self.pranges)):
hdu.append(fits.ImageHDU(self.especeval[i][0]))
# write for later use
hdu.writeto(svdoutputfits)
logger.info('SVD file saved to %s', svdoutputfits)
def save_eor_window(self, outfile, clobber=False):
header = self.header_eor_windowr()
hdu = fits.PrimaryHDU(data=self.eor_window.astype(np.int16),
header=header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
def write(self, outfile=None, clobber=None):
if outfile is None:
outfile = self.get_outfile()
if clobber is None:
clobber = self.configs.clobber
hdu = fits.PrimaryHDU(data=self.cube, header=self.header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote light-cone cube to: %s" % outfile)
def write(self, outfile, clobber=False):
hdu = fits.PrimaryHDU(data=self.cube, header=self.header)
logger.info("Created FITS object, writing to disk ...")
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote light-cone cube to: %s" % outfile)
def write(self, outfile, clobber=False):
header = self.header
header.extend(self.wcs.to_header(), update=True)
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
hdu = fits.PrimaryHDU(data=self.data, header=header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
def write_mask(self, outfile, clobber=False):
if os.path.exists(outfile) and (not clobber):
raise OSError("Sky mask already exists: %s" % outfile)
header = self.fits_header
header.add_history(datetime.now().isoformat())
header.add_history(" ".join(sys.argv))
hdu = fits.PrimaryHDU(data=self.mask.astype(np.int16),
header=header)
try:
hdu.writeto(outfile, overwrite=True)
except TypeError:
hdu.writeto(outfile, clobber=True) # old astropy versions
logger.info("Wrote mask of sky model to file: %s" % outfile)
def save(self, outfile, clobber=False):
"""
Save the calculated 2D power spectrum as a FITS image.
"""
hdu = fits.PrimaryHDU(data=self.ps2d, header=self.header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote 2D power spectrum to file: %s" % outfile)
def write_rmfimg(self, outfile, clobber=False):
rmfimg = self.get_rmfimg()
# merge headers
header = self.hdr_matrix.copy(strip=True)
header.extend(self.hdr_ebounds.copy(strip=True))
outfits = fits.PrimaryHDU(data=rmfimg, header=header)
try:
outfits.writeto(outfile, checksum=True, overwrite=clobber)
except TypeError:
outfits.writeto(outfile, checksum=True, clobber=clobber)
# class RMF }}}
def _open(self, cache=False, **kwargs):
if not _fits:
load_lib()
hdulist = _fits.open(self.request.get_file(),
cache=cache, **kwargs)
self._index = []
for n, hdu in zip(range(len(hdulist)), hdulist):
if (isinstance(hdu, _fits.ImageHDU) or
isinstance(hdu, _fits.PrimaryHDU)):
# Ignore (primary) header units with no data (use '.size'
# rather than '.data' to avoid actually loading the image):
if hdu.size > 0:
self._index.append(n)
self._hdulist = hdulist
def burnInStripper(mcmcFnames,burnInLengths):
"""
Strip the initial burn-in off each chain.
"""
newFnames=[]
for i in range(0,len(mcmcFnames)):
filename = mcmcFnames[i]
burnIn = burnInLengths[i]
if os.path.exists(filename):
(head,data) = loadFits(filename)
##strip burn-in and write to new fits
log.debug("Before stripping burn-in, file had "+str(len(data[:,0]))+" samples")
hdu = pyfits.PrimaryHDU(data[burnIn:,:])
hdulist = pyfits.HDUList([hdu])
newHead = hdulist[0].header
log.debug("Before stripping burn-in, file had "+str(len(hdulist[0].data[:,0]))+" samples")
for key in head:
newHead[key]=(head[key],head.comments[key])
n = os.path.splitext(os.path.basename(filename))
newFname = os.path.join(os.path.dirname(mcmcFnames[0]),n[0]+'_BIstripped.fits')
hdulist.writeto(newFname)
log.info("output file written to:below\n"+newFname)
hdulist.close()
newFnames.append(newFname)
log.debug("burn-in stripped file written to:\n"+newFname)
return newFnames
def convert_hpx(self, comp_id, freq_id, clobber=False):
"""
Convert the specified HEALPix map product to HPX projected FITS image.
Also add the metadata of the HPX image to the manifest.
Raises
------
IOError :
Output HPX image already exists and ``clobber=False``
"""
from astropy.io import fits
from .utils.healpix import healpix2hpx
#
root_dir = self.get_root_dir()
metadata = self.get_product(comp_id, freq_id)
infile = os.path.join(root_dir, metadata["healpix"]["path"])
outfile = os.path.splitext(infile)[0] + "_hpx.fits"
if os.path.exists(outfile):
if clobber:
os.remove(outfile)
logger.warning("Removed existing HPX image: %s" % outfile)
else:
raise IOError("Output HPX image already exists: %s" % outfile)
# Convert HEALPix map to HPX projected FITS image
logger.info("Converting HEALPix map to HPX image: %s" % infile)
hpx_data, hpx_header = healpix2hpx(infile)
hdu = fits.PrimaryHDU(data=hpx_data, header=hpx_header)
hdu.writeto(outfile)
logger.info("Converted HEALPix map to HPX image: %s" % outfile)
#
size = os.path.getsize(outfile)
md5 = calc_md5(outfile)
metadata["hpx"] = {
"path": os.path.relpath(outfile, root_dir),
"size": size,
"md5": md5,
}
return (outfile, size, md5)
def write_fits_image(outfile, image, header=None, float32=False,
clobber=False, checksum=False):
"""
Write the supplied image (together with header information) into
the output FITS file.
Parameters
----------
outfile : str
The path/filename to the output file storing the pickled object.
image : 2D `~numpy.ndarray`
The image data to be written out to the FITS file.
NOTE: image.shape: (nrow, ncol) <-> FITS image: (ncol, nrow)
header : `~astropy.io.fits.Header`
The FITS header information for this image
float32 : bool, optional
Whether coerce the image data (generally double/float64 data type)
into single/float32 (in order to save space)?
Default: False (i.e., preserve the data type unchanged)
clobber : bool, optional
Whether to overwrite the existing output file.
Default: False
checksum : bool, optional
Whether to calculate the data checksum, which may cost some time?
Default: False
"""
_create_dir(outfile)
_check_existence(outfile, clobber=clobber, remove=True)
hdr = fits.Header()
hdr["CREATOR"] = (__name__, "File creator")
hdr["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
if header is not None:
hdr.extend(header, update=True)
if float32:
image = np.asarray(image, dtype=np.float32)
hdu = fits.PrimaryHDU(data=image, header=header)
hdu.writeto(outfile, checksum=checksum)
logger.info("Wrote image to FITS file: %s" % outfile)
def makeColdHeatMap(inpath,outpath,inCube,startwl,coldwls):
cube = pyfits.open(inpath+inCube)
cube_all = cube[0].data[startwl:,0:,0:]
hdr_all = cube[0].header
cube = pyfits.open(inpath+inCube.replace('_i','_old_i'))
cube_old = cube[0].data[startwl:,0:,0:]
cube = pyfits.open(inpath+inCube.replace('_i','_young_i'))
cube_young = cube[0].data[startwl:,0:,0:]
Fold = 100. * (0.5*cube_old + 0.5*(cube_all-cube_young)) / cube_all
Fyoung = 100. * (0.5*cube_young + 0.5*(cube_all-cube_old)) / cube_all
tot_all = 100.* (coldwls[len(coldwls)-1] - coldwls[0])
# Get header with appropriate WCS info
im36 = pyfits.open("SKIRTinput/new3.6MJySr.fits")
hdr_wcs = im36[0].header
tot_old = integrateHeating(Fold,coldwls) / tot_all
hdu = pyfits.PrimaryHDU(tot_old,hdr_wcs)
hdu.writeto(outpath+"heatingTotColdOld.fits",clobber=True)
tot_young = integrateHeating(Fyoung,coldwls) / tot_all
hdu = pyfits.PrimaryHDU(tot_young,hdr_wcs)
hdu.writeto(outpath+"heatingTotColdYoung.fits",clobber=True)