def simple_generic_writer(data, file_name, **kwargs):
"""
Basic `Spectrum1DRef` FITS writer.
"""
# Create fits columns
flux_col = fits.Column(name='FLUX', format='E', array=data.data)
disp_col = fits.Column(name='WAVE', format='E', array=data.dispersion)
uncert_col = fits.Column(name='UNCERT', format='E', array=data.uncertainty.array)
mask_col = fits.Column(name='MASK', format='L', array=data.mask)
cols = fits.ColDefs([flux_col, disp_col, uncert_col, mask_col])
# Create the bin table
tbhdu = fits.BinTableHDU.from_columns(cols)
# Create header
prihdu = fits.PrimaryHDU(header=data.meta.get('header', data.wcs.to_header()))
# Compose
thdulist = fits.HDUList([prihdu, tbhdu])
thdulist.writeto("{}.fits".format(file_name), overwrite=True)
python类ColDefs()的实例源码
def write(self, filename, clobber=False):
"""
Write the updated/modified spectrum block to file.
"""
channel_col = fits.Column(name='CHANNEL', format='J',
array=self.channel)
counts_col = fits.Column(name='COUNTS', format='J',
array=self.counts_squeezed)
stat_err_col = fits.Column(name='STAT_ERR', format='D',
array=self.stat_err_adjusted)
grouping_col = fits.Column(name='GROUPING', format='I',
array=self.grouping)
quality_col = fits.Column(name='QUALITY', format='I',
array=self.quality)
spec_cols = fits.ColDefs([channel_col, counts_col, stat_err_col,
grouping_col, quality_col])
spechdu = fits.BinTableHDU.from_columns(spec_cols, header=self.header)
spechdu.writeto(filename, clobber=clobber)
def add_time_column(fitsfile, blockname="EVENTS"):
"""
Add a time column to the specified block of the input fits file.
The time data are generated with a uniform distribution
between TSTART and TSTOP.
Return:
A fits object with the new time column.
"""
if isinstance(fitsfile, str):
fitsfile = fits.open(fitsfile)
table = fitsfile[blockname]
tstart = table.header["TSTART"]
tstop = table.header["TSTOP"]
counts = len(table.data)
time_data = np.random.uniform(tstart, tstop, counts)
time_col = fits.Column(name="time", format="1D", unit="s", array=time_data)
# NOTE: append the new time column to the *last*!
# Otherwise the TLMIN??/TLMAX?? keyword pairs, which record the
# minimum/maximum values of corresponding columns, will become
# *out of order*. Therefore the output FITS file causes weird problems
# with DS9 and DM tools.
newtable = fits.BinTableHDU.from_columns(
table.columns + fits.ColDefs([time_col]))
fitsfile[blockname].data = newtable.data
# update header
fitsfile[blockname].header.update(newtable.header)
return fitsfile
def write(self, outfile, clobber=False):
"""
Create a new "SPECTRUM" table/extension and replace the original
one, then write to output file.
"""
# Open the original input spectrum as the base
fitsobj = fits.open(self.filename)
columns = [
fits.Column(name="CHANNEL", format="I", array=self.channel),
fits.Column(name=self.spec_type, format=self.spec_fits_format,
unit=self.spec_unit, array=self.spec_data),
]
if self.grouping is not None:
columns.append(fits.Column(name="GROUPING",
format="I", array=self.grouping))
if self.quality is not None:
columns.append(fits.Column(name="QUALITY",
format="I", array=self.quality))
if self.stat_err is not None:
columns.append(fits.Column(name="STAT_ERR", unit=self.spec_unit,
format=self.spec_fits_format,
array=self.stat_err))
ext_spec_cols = fits.ColDefs(columns)
ext_spec = fits.BinTableHDU.from_columns(ext_spec_cols,
header=self.header)
# Replace the original spectrum data
fitsobj["SPECTRUM"] = ext_spec
try:
fitsobj.writeto(outfile, overwrite=clobber, checksum=True)
except TypeError:
fitsobj.writeto(outfile, clobber=clobber, checksum=True)
# class Spectrum }}}
def wraptable2fits(cat_columns, extname):
tbhdu = fits_from_columns(pyfits.ColDefs(cat_columns))
hdu = pyfits.PrimaryHDU()
import datetime, time
now = datetime.datetime.fromtimestamp(time.time())
nowstr = now.isoformat()
nowstr = nowstr[:nowstr.rfind('.')]
hdu.header['DATE'] = nowstr
hdu.header['ANALYSIS'] = 'NWAY matching'
tbhdu.header['EXTNAME'] = extname
hdulist = pyfits.HDUList([hdu, tbhdu])
return hdulist
def array2fits(table, extname):
cat_columns = pyfits.ColDefs([pyfits.Column(name=n, format='E',array=table[n])
for n in table.dtype.names])
return wraptable2fits(cat_columns, extname)
def savepysyn(self,wave,flux,fname,units=None):
""" Cannot ever use the .writefits() method, because the array is
frequently just sampled at the synphot waveset; plus, writefits
is smart and does things like tapering."""
if units is None:
ytype='throughput'
units=' '
else:
ytype='flux'
col1=pyfits.Column(name='wavelength',format='D',array=wave)
col2=pyfits.Column(name=ytype,format='D',array=flux)
tbhdu=pyfits.BinTableHDU.from_columns(pyfits.ColDefs([col1,col2]))
tbhdu.header.update('tunit1','angstrom')
tbhdu.header.update('tunit2',units)
tbhdu.writeto(fname.replace('.fits','_pysyn.fits'))
def fits2ldac (header4ext2, data4ext3, fits_ldac_out, doSort=True):
"""This function converts the binary FITS table from Astrometry.net to
a binary FITS_LDAC table that can be read by PSFex. [header4ext2]
is what will be recorded as a single long string in the data part
of the 2nd extension of the output table [fits_ldac_out], and
[data4ext3] is the data part of an HDU that will define both the
header and data parts of extension 3 of [fits_ldac_out].
"""
# convert header to single (very) long string
ext2_str = header4ext2.tostring(endcard=False, padding=False)
# if the following line is not added, the very end of the data
# part of extension 2 is written to a fits table such that PSFex
# runs into a segmentation fault when attempting to read it (took
# me ages to find out!).
ext2_str += 'END END'
# read into string array
ext2_data = np.array([ext2_str])
# determine format string for header of extention 2
formatstr = str(len(ext2_str))+'A'
# create table 1
col1 = fits.Column(name='Field Header Card', array=ext2_data, format=formatstr)
cols = fits.ColDefs([col1])
ext2 = fits.BinTableHDU.from_columns(cols)
# make sure these keywords are in the header
ext2.header['EXTNAME'] = 'LDAC_IMHEAD'
ext2.header['TDIM1'] = '(80, {0})'.format(len(ext2_str)/80)
# simply create extension 3 from [data4ext3]
ext3 = fits.BinTableHDU(data4ext3)
# extname needs to be as follows
ext3.header['EXTNAME'] = 'LDAC_OBJECTS'
# sort output table by number column if needed
if doSort:
index_sort = np.argsort(ext3.data['NUMBER'])
ext3.data = ext3.data[index_sort]
# create primary HDU
prihdr = fits.Header()
prihdu = fits.PrimaryHDU(header=prihdr)
prihdu.header['EXPTIME'] = header4ext2['EXPTIME']
prihdu.header['FILTNAME'] = header4ext2['FILTNAME']
# prihdu.header['SEEING'] = header4ext2['SEEING'] #need to calculte and add
prihdu.header['BKGSIG'] = header4ext2['SEXBKDEV']
# write hdulist to output LDAC fits table
hdulist = fits.HDUList([prihdu, ext2, ext3])
hdulist.writeto(fits_ldac_out, clobber=True)
hdulist.close()
################################################################################