def compressed_fits_ext(fits_file):
'''
Check if a fits file is a compressed FITS file. Return the extension numbers
of the compressed image as a list if these exist, otherwise, return None.
'''
hdulist = pyfits.open(fits_file)
compressed_img_exts = []
for i, ext in enumerate(hdulist):
if isinstance(ext,pyfits.hdu.compressed.CompImageHDU):
compressed_img_exts.append(i)
hdulist.close()
if len(compressed_img_exts) < 1:
return None
else:
return compressed_img_exts
python类open()的实例源码
def get_header_keyword(fits_file,
keyword,
ext=0):
'''
Get the value of a header keyword in a fits file optionally using an
extension.
'''
hdulist = pyfits.open(fits_file)
if keyword in hdulist[ext].header:
val = hdulist[ext].header[keyword]
else:
val = None
hdulist.close()
return val
def get_header_keyword_list(fits_file,
keyword_list,
ext=0):
hdulist = pyfits.open(fits_file)
out_dict = {}
for keyword in keyword_list:
if keyword in hdulist[ext].header:
out_dict[keyword] = hdulist[ext].header[keyword]
else:
out_dict[keyword] = None
hdulist.close()
return out_dict
## IMAGE SCALING FUNCTIONS ##
def __init__(self, infile, fov, e=2.0,
k_perp_min=None, k_perp_max=None,
k_los_min=None, k_los_max=None):
self.infile = infile
with fits.open(infile) as f:
self.header = f[0].header
self.ps2d = f[0].data[0, :, :]
self.ps2d_err = f[0].data[1, :, :]
self.freqc = self.header["Freq_C"]
self.freqmin = self.header["Freq_Min"]
self.freqmax = self.header["Freq_Max"]
self.bandwidth = self.freqmax - self.freqmin # [MHz]
self.zc = self.header["Z_C"]
self.pixelsize = self.header["PixSize"]
self.unit = self.header["BUNIT"]
self.set(fov=fov, e=e, k_perp_min=k_perp_min, k_perp_max=k_perp_max,
k_los_min=k_los_min, k_los_max=k_los_max)
def write_gti(self, filename=None, header=True):
"""
Write generated GTIs to file or screen (default)
"""
if isinstance(filename, str):
outfile = open(filename, 'w')
else:
outfile = sys.stdout
#
if header:
outfile.write('# TSTART\tTSTOP\n')
outfile.write('\n'.join([ '%s\t%s' % (tstart, tstop) \
for tstart, tstop in zip(self.gti_start, self.gti_stop) ]))
#
if isinstance(filename, str):
outfile.close()
def open_image(infile):
"""
Open the FITS image and return its header and data, but requiring
the input image has only ONE frequency.
The input FITS image may have following dimensions:
* NAXIS=2: [Y, X]
* NAXIS=3: [FREQ=1, Y, X]
* NAXIS=4: [STOKES, FREQ=1, Y, X]
"""
with fits.open(infile) as f:
header = f[0].header
data = f[0].data
if ((data.ndim == 3 and data.shape[0] != 1) or
(data.ndim == 4 and data.shape[1] != 1)):
# NAXIS=3: [FREQ!=1, Y, X]
# NAXIS=4: [STOKES, FREQ!=1, Y, X]
raise ValueError("input file '{0}' has invalid dimensions: {1}".format(
infile, data.shape))
print("Read in FITS image from: %s" % infile)
return (header, data)
def __init__(self, filename, regid=None):
self.filename = filename
self.regid = regid
with fits.open(filename) as fitsobj:
# "MATRIX" extension
ext_matrix = fitsobj["MATRIX"]
self.hdr_matrix = ext_matrix.header
self.energ_lo = ext_matrix.data["ENERG_LO"] # [keV]
self.energ_hi = ext_matrix.data["ENERG_HI"] # [keV]
self.n_grp = ext_matrix.data["N_GRP"]
self.f_chan = ext_matrix.data["F_CHAN"]
self.n_chan = ext_matrix.data["N_CHAN"]
self.matrix = ext_matrix.data["MATRIX"]
# "EBOUNDS" extension
ext_ebounds = fitsobj["EBOUNDS"]
self.hdr_ebounds = ext_ebounds.header
self.channel = ext_ebounds.data["CHANNEL"]
self.e_min = ext_ebounds.data["E_MIN"] # [keV]
self.e_max = ext_ebounds.data["E_MAX"] # [keV]
def load_fits(self, file_url):
hdu = fits.open(f"{DATA_ROOT}//{file_url}.fits")
dat, hdr = hdu[1].data, hdu[0].header
z = hdu[2].data['Z'][0] # This is the redshift
hdu.close()
wav_rest= 10**(dat['loglam'])/(1+z) #convert to rest frame
# See https://en.wikipedia.org/wiki/Redshift
fwav = dat['flux'] # Get flux density, in this case erg/cm^2/s/Angstrom.
#xs = np.log(wav[idx])
#ys = np.log(flx[idx])
# Normalize the spectrum for plotting purposes.
def find_nearest(array, value):
"""Quick nearest-value finder."""
return int((np.abs(array - value)).argmin())
norm = fwav[find_nearest(wav_rest, 5100)]
fwav = fwav / norm
return wav_rest, fwav
def get_notwr_data(idx, spec_dir, filenames):
spec_path = spec_dir + filenames[idx]
SpecID = spec_path[6:21]
hdulist = fits.open(spec_path)
# Get wavelength data
wav_rest = 10 ** hdulist[1].data['loglam']
# Get flux density, in this case erg/cm^2/s/Angstrom.
fwav = hdulist[1].data['flux']
crop_range = [4686 - 250, 4686 + 250]
wav_rest, fwav = crop_data(wav_rest, fwav, crop_range)
hdulist.close()
return wav_rest, fwav, SpecID
def process_file(path, wl_min, wl_max, n_samples, check_he2=False):
hdulist = fits.open(path)
wls = 10**hdulist[1].data['loglam']
fxs = hdulist[1].data['flux']
z = hdulist[2].data['z']
wls = wls / (1 + z)
if wl_min < wls[0] or wl_max > wls[-1]:
return None
remove_slope(wls, fxs)
wls, fxs = gaussian_smooth(wls, fxs)
wls, fxs = crop_data(wls, fxs, wl_min, wl_max)
wls, fxs = standardize_domain(wls, fxs, wl_min, wl_max, n_samples)
if check_he2:
if is_he2(wls, fxs):
print('including ' + path)
return fxs
print('excluding ' + path)
return None
return fxs
def FileReader(file_list,param_list):
row_add = np.zeros(shape=(1,len(param_list)+1))
for file in file_list:
hdulist = fits.open(file,memmap=True)
data_in = hdulist[1].data
col_add = np.zeros(shape=(len(data_in),1))
print file
for param in param_list:
data_now = np.reshape(data_in[param],(len(data_in[param]),1))
col_add = np.append(col_add,data_now,axis=1)
row_add = np.append(row_add,col_add,axis=0)
del hdulist
row_add = np.delete(row_add,0,axis=0)
row_add = np.delete(row_add,0,axis=1)
return row_add
def write_outputs(self):
"""Write information of detected streaks."""
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
fp = open('%sstreaks.txt' % self.output_path, 'w')
fp.writelines('#ID x_center y_center area perimeter shape_factor ' +
'radius_deviation slope_angle intercept connectivity\n')
for n, edge in enumerate(self.streaks):
line = '%2d %7.2f %7.2f %6.1f %6.1f %6.3f %6.2f %5.2f %7.2f %2d\n' \
% \
(
edge['index'], edge['x_center'], edge['y_center'],
edge['area'], edge['perimeter'], edge['shape_factor'],
edge['radius_deviation'], edge['slope_angle'],
edge['intercept'], edge['connectivity']
)
fp.writelines(line)
fp.close()
def read_fits(filename='long.fits'):
"""
Read an sample fits file and return only image part.
Parameters
----------
filename : str
Fits filename.
Returns
-------
data : numpy.ndarray
Fits image data.
"""
module_path = dirname(__file__)
file_path = join(module_path, 'samples', filename)
hdulist = fits.open(file_path)
data = hdulist[0].data
hdulist.close()
return data
def loadFits(filename,noData=False):
"""
Load in a fits file written by ExoSOFT.
Return (header dict, data)
"""
if os.path.exists(filename):
f = pyfits.open(filename,'readonly')
head = f[0].header
if noData==False:
data = f[0].data
f.close()
else:
log.critical("fits file does not exist!!! filename =\n"+str(filename))
head=data=False
if noData:
return head
else:
return (head,data)
def read_keyword2(infile, keyword):
"""
Read the specified header keyword using ``astropy.io.fits``
and return a tuple of ``(value, comment)``.
NOTE
----
Header of all extensions (a.k.a. blocks) are combined to locate
the keyword.
"""
with fits.open(infile) as f:
h = fits.header.Header()
for hdu in f:
h.extend(hdu.header)
value = h[keyword]
comment = h.comments[keyword]
return (value, comment)
def load_hdulist_from_fitsfile(self, filename):
"""
The purpose of wrapping fits.open inside this routine is to put
all the warning suppressions, flags, etc in one place.
"""
with warnings.catch_warnings():
warnings.simplefilter('ignore')
max_n_tries = 5
pause_time_between_tries_sec = 1.
cur_try = 0
not_yet_successful = True
while (cur_try < max_n_tries) and not_yet_successful:
try:
hdulist = fits.open(filename, ignore_missing_end=True)
not_yet_successful = False
except: # I've only seen IOerror, but might as well catch for all errors and re-try
time.sleep(pause_time_between_tries_sec)
cur_try += 1
return hdulist
cumulative_distributions.py 文件源码
项目:DR1_analysis
作者: GBTAmmoniaSurvey
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def regrid_h2(projDir='/media/DATAPART/projects/GAS/testing/',
region_list = ['L1688','NGC1333','B18','OrionA'],
file_extension='DR1_rebase3',
herDir = '/media/DATAPART/projects/GAS/otherData/herschel_ayushi/',
herFile_list=['OphL1688','perseus','Tau_B18','orionA-N']):
for region_i in range(len(region_list)):
region = region_list[region_i]
herFilename = herFile_list[region_i]
herColFits = herDir+'{0}/Colden_temp/{1}_colden_masked.fits'.format(region,herFilename)
nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension)
h2_hdu = fits.open(herColFits)
nh3_hdr = fits.getheader(nh3ImFits)
new_h2 = FITS_tools.hcongrid.hcongrid(h2_hdu[0].data,h2_hdu[0].header,nh3_hdr)
new_h2_hdu = fits.PrimaryHDU(new_h2,nh3_hdr)
new_h2_hduList = fits.HDUList([new_h2_hdu])
new_h2_hduList.writeto('nh2_regridded/{0}_NH2_regrid.fits',clobber=True)
def _load_rss_from_file(self):
"""Initialises the RSS object from a file."""
try:
self.data = fits.open(self.filename)
self.mangaid = self.data[0].header['MANGAID'].strip()
self.plateifu = '{0}-{1}'.format(
self.data[0].header['PLATEID'], self.data[0].header['IFUDSGN'])
except Exception as ee:
raise MarvinError('Could not initialize via filename: {0}'.format(ee))
# Checks and populates release.
file_drpver = self.data[0].header['VERSDRP3']
file_drpver = 'v1_5_1' if file_drpver == 'v1_5_0' else file_drpver
file_ver = marvin.config.lookUpRelease(file_drpver)
assert file_ver is not None, 'cannot find file version.'
if file_ver != self._release:
warnings.warn('mismatch between file version={0} and object release={1}. '
'Setting object release to {0}'.format(file_ver, self._release),
MarvinUserWarning)
self._release = file_ver
self._drpver, self._dapver = marvin.config.lookUpVersions(release=self._release)
def get_bparam(ra, dec, psf_path):
"""
"""
#psflist = pyfits.open('mosaic_Week2_freqrange_psf.fits')
psflist = pyfits.open(psf_path)
w = pywcs.WCS(psflist[0].header, naxis=2)
pixcoords = w.wcs_world2pix([[ra, dec]], 0)[0]
#pixcoords = w.wcs_world2pix([[ra, dec]], 1)[0]
#pixcoords = [int(round(elem)) for elem in pixcoords]
pixcoords = [int(elem) for elem in pixcoords]
a = pixcoords[1]
b = pixcoords[0]
a_len = psflist[0].data[0].shape[0]
b_len = psflist[0].data[0].shape[1]
if (a >= a_len):
raise Exception("Cutout centre is out of the PSF map: {0} >= {1}".format(a, a_len))
a = a_len - (a_len - a + 1) # a = a - 1?
if (b >= b_len):
raise Exception("Cutout centre is out of the PSF map: {0} >= {1}".format(b, b_len))
b = b_len - (b_len - b + 1)
bmaj = psflist[0].data[0][a][b]
bmin = psflist[0].data[1][a][b]
bpa = psflist[0].data[2][a][b]
return (bmaj, bmin, bpa)
def load_image(path, height, width, mode='RGB'):
"""
Load an image from disk
Returns an np.ndarray (channels x width x height)
Arguments:
path -- path to an image on disk
width -- resize dimension
height -- resize dimension
Keyword arguments:
mode -- the PIL mode that the image should be converted to
(RGB for color or L for grayscale)
"""
image = PIL.Image.open(path)
image = image.convert(mode)
image = np.array(image)
# squash
image = scipy.misc.imresize(image, (height, width), 'bilinear')
return image
# Forward pass of input through the network
def read_labels(labels_file):
"""
Returns a list of strings
Arguments:
labels_file -- path to a .txt file
"""
if not labels_file:
print 'WARNING: No labels file provided. Results will be difficult to interpret.'
return None
labels = []
with open(labels_file) as infile:
for line in infile:
label = line.strip()
if label:
labels.append(label)
assert len(labels), 'No labels found'
return labels
# Decide class based on threshold
def fits2jpg(fname):
hdu_list = fits.open(fname)
image = hdu_list[0].data
image = np.squeeze(image)
img = np.copy(image)
idx = np.isnan(img)
img[idx] = 0
img_clip = np.flipud(img)
sigma = 3.0
# Estimate stats
mean, median, std = sigma_clipped_stats(img_clip, sigma=sigma, iters=10)
# Clip off n sigma points
img_clip = clip(img_clip,std*sigma)
if img_clip.shape[0] !=150 or img_clip.shape[1] !=150:
img_clip = resize(img_clip, (150,150))
#img_clip = rgb2gray(img_clip)
outfile = fname[0:-5] +'.png'
imsave(outfile, img_clip)
return img_clip,outfile
# Do the fusion classification
def __init__(self, filepath, verbose=False):
'''
Creates an exposure class which has many extensions.
Parameters
----------
filepath: str
Path to the fits file of a single exposure
verbose: bool
Print some info to visually inspect
'''
self.source_table = Table(names=['aperture_sum','xcenter','ycenter','ra','dec'])
# Open the file and print the info
self.hdulist = pf.open(filepath)
if verbose:
self.hdulist.info()
# Get the datetime of the exposure from the filename
self.date_str = os.path.basename(filepath).replace('_ffi-cal.fits','').replace('kplr','')
self.datetime = time.strptime(self.date_str, '%Y%j%H%M%S')
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.add_page()
pdf.set_font('Courier', '', 12)
pdf.cell(0, 6, "Channel Communications Test", 0, 1, 'L', 1)
pdf.cell(0, 6, "", 0, 1, 'L', 0)
pdf.columnTable([self.channels, self.vals], colHeaders = ["Channel", "Value"], fontSize = 8)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/channels.dat", "wb") as output:
pickle.dump(self.channels, output)
with open(testPath + "/vals.dat", "wb") as output:
pickle.dump(self.vals, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.makeResidualPlotPage("Diverging SCKRails Test %i V" % int(self.startV),
"tempFigures/divergingSCKRails %i.jpg" % int(self.startV),
self.data,
self.residuals,
ROI = self.ROI,
pltRange = [-12, 12])
pdf.cell(epw, pdf.font_size, self.stats, align = 'C', ln = 1)
pdf.passFail(self.passed)
pdf.columnTable(self.data + self.residuals, ROI = self.ROI)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
with open(testPath + "/residuals.dat", "wb") as output:
pickle.dump(self.residuals, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.makeResidualPlotPage("Diverging RGRails Test %i V" % self.startV,
"tempFigures/divergingRGRails %i.jpg" % self.startV,
self.data,
self.residuals,
ROI = self.ROI,
pltRange = [-12, 12])
pdf.cell(epw, pdf.font_size, self.stats, align = 'C', ln = 1)
pdf.passFail(self.passed)
pdf.columnTable(self.data + self.residuals, ROI = self.ROI)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
with open(testPath + "/residuals.dat", "wb") as output:
pickle.dump(self.residuals, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
onePage = False
if onePage:
pdf.makePlotPage("Parameter Logging: " + name, name + ".jpg",
[(self.data[name], name) for name in self.names])
pdf.cell(0, 6, "Data saved to pickleable object in ParameterLogging.dat with key " + name, 0, 1, 'L')
else:
for name in self.names:
pdf.makePlotPage("Parameter Logging: " + name, name + ".jpg", [(self.data[name], name)])
pdf.cell(0, 6, "Data saved to pickleable object in ParameterLogging.dat with key " + name, 0, 1, 'L')
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.add_page()
pdf.set_font('Courier', '', 12)
pdf.cell(0, 6, "Channel Communications Test", 0, 1, 'L', 1)
pdf.cell(0, 6, "", 0, 1, 'L', 0)
pdf.columnTable([self.channels, self.vals], colHeaders = ["Channel", "Value"], fontSize = 8)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/channels.dat", "wb") as output:
pickle.dump(self.channels, output)
with open(testPath + "/vals.dat", "wb") as output:
pickle.dump(self.vals, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.makeResidualPlotPage("Diverging SCKRails Test %i V" % int(self.startV),
"tempFigures/divergingSCKRails %i.jpg" % int(self.startV),
self.data,
self.residuals,
ROI = self.ROI,
pltRange = [-12, 12])
pdf.cell(epw, pdf.font_size, self.stats, align = 'C', ln = 1)
pdf.passFail(self.passed)
pdf.columnTable(self.data + self.residuals, ROI = self.ROI)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
with open(testPath + "/residuals.dat", "wb") as output:
pickle.dump(self.residuals, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.makeResidualPlotPage("Diverging RGRails Test %i V" % self.startV,
"tempFigures/divergingRGRails %i.jpg" % self.startV,
self.data,
self.residuals,
ROI = self.ROI,
pltRange = [-12, 12])
pdf.cell(epw, pdf.font_size, self.stats, align = 'C', ln = 1)
pdf.passFail(self.passed)
pdf.columnTable(self.data + self.residuals, ROI = self.ROI)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
with open(testPath + "/residuals.dat", "wb") as output:
pickle.dump(self.residuals, output)