def recordContinuously(self):
'''@brief Continuously records the requested parameters while self.recording is set to true.'''
count = 0
while self.recording:
if self.fnTest is not None:
prog = self.fnTest.progress
if prog >= 100:
self.stopTest()
self.status = -prog if prog > 0 else "Working..."
else:
self.status = "Working..."
count += 1
for (subsystem, value), name in zip(self.valuesToRead, self.names):
command = '{}.synchCommandLine(1000,"readChannelValue {}").getResult()'.format(subsystem, value)
result = jy2.get(command)
self.data[name].append(result)
if count == self.backup > 0:
count = 0
pickle.dump(self.data, open("ParameterLogging.dat", "wb"))
time.sleep(self.delay)
python类open()的实例源码
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
onePage = False
if onePage:
pdf.makePlotPage("Parameter Logging: " + name, name + ".jpg",
[(self.data[name], name) for name in self.names])
pdf.cell(0, 6, "Data saved to pickleable object in ParameterLogging.dat with key " + name, 0, 1, 'L')
else:
for name in self.names:
pdf.makePlotPage("Parameter Logging: " + name, name + ".jpg", [(self.data[name], name)])
pdf.cell(0, 6, "Data saved to pickleable object in ParameterLogging.dat with key " + name, 0, 1, 'L')
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
pdf.makeResidualPlotPage("Diverging RGRails Test %i V" % self.startV,
"tempFigures/divergingRGRails %i.jpg" % self.startV,
self.data,
self.residuals,
ROI = self.ROI,
pltRange = [-12, 12])
pdf.cell(epw, pdf.font_size, self.stats, align = 'C', ln = 1)
pdf.passFail(self.passed)
pdf.columnTable(self.data + self.residuals, ROI = self.ROI)
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
with open(testPath + "/residuals.dat", "wb") as output:
pickle.dump(self.residuals, output)
def recordContinuously(self):
'''@brief Continuously records the requested parameters while self.recording is set to true.'''
count = 0
while self.recording:
if self.fnTest is not None:
prog = self.fnTest.progress
if prog >= 100:
self.stopTest()
self.status = -prog if prog > 0 else "Working..."
else:
self.status = "Working..."
count += 1
for (subsystem, value), name in zip(self.valuesToRead, self.names):
command = '{}.synchCommandLine(1000,"readChannelValue {}").getResult()'.format(subsystem, value)
result = jy2.get(command)
self.data[name].append(result)
if count == self.backup > 0:
count = 0
pickle.dump(self.data, open("ParameterLogging.dat", "wb"))
time.sleep(self.delay)
def report(self, pdf, reportPath):
'''@brief generate this test's page in the PDF report.
@param pdf pyfpdf-compatible PDF object.
@param reportPath Path of directory containing the pdf report'''
onePage = False
if onePage:
pdf.makePlotPage("Parameter Logging: " + name, name + ".jpg",
[(self.data[name], name) for name in self.names])
pdf.cell(0, 6, "Data saved to pickleable object in ParameterLogging.dat with key " + name, 0, 1, 'L')
else:
for name in self.names:
pdf.makePlotPage("Parameter Logging: " + name, name + ".jpg", [(self.data[name], name)])
pdf.cell(0, 6, "Data saved to pickleable object in ParameterLogging.dat with key " + name, 0, 1, 'L')
if dump:
testPath = reportPath + "/" + self.title
os.mkdir(testPath)
with open(testPath + "/data.dat", "wb") as output:
pickle.dump(self.data, output)
def buildtmc(tmcname):
from pyraf import iraf
from iraf import stsdas,hst_calib,synphot
out=open('buildtmc.log','w')
f=pyfits.open(tmcname)
flist=f[1].data.field('filename')
iraf.set(crrefer='./') #work locally
for k in range(len(flist)):
oldname=iraf.osfn(flist[k]).split('[')[0]
newname=fincre(oldname)
if os.path.exists(newname):
flist[k]=fincre(flist[k])
else:
out.write("%s: no change necessary\n"%oldname)
f.writeto(tmcname.replace(".fits","_new.fits"))
out.close()
def _readFITS(self, filename, fluxname):
fs = pyfits.open(filename)
# pyfits cannot close the file on .close() if there are still
# references to mmapped data
self._wavetable = fs[1].data.field('wavelength').copy()
if fluxname is None:
fluxname = 'flux'
self._fluxtable = fs[1].data.field(fluxname).copy()
self.waveunits = units.Units(fs[1].header['tunit1'].lower())
self.fluxunits = units.Units(fs[1].header['tunit2'].lower())
# Retain the header information as a convenience for the user.
# If duplicate keywords exist, the value in the extension
# header will override that in the primary.
self.fheader = dict(fs[0].header)
self.fheader.update(dict(fs[1].header))
fs.close()
def __init__(self, CFile=None):
# None is common for various errors.
# the default value of None is not useful; pyfits.open(None) does not work.
if CFile is None :
raise TypeError('initializing CompTable with CFile=None; possible bad/missing CDBS')
cp = pyfits.open(CFile)
self.compnames = cp[1].data.field('compname')
self.filenames = cp[1].data.field('filename')
# Is this necessary?
compdict = {}
for i in range(len(self.compnames)):
compdict[self.compnames[i]] = self.filenames[i]
cp.close()
self.name=CFile
def get_mini_image(self, center, halfsize=15):
"""
:param center: tuple of coordinates, in pixels
:param size: length of the square around center
:return: ndarray which contain the image
"""
side = 2 * halfsize + 1
image = [[0 for x in range(side)] for y in range(side)]
data_white = fits.open(self.filename_white)[1].data
center_x = center[0]
center_y = center[1]
for i in xrange(center_x - halfsize - 1, center_x + halfsize):
for j in xrange(center_y - halfsize - 1, center_y + halfsize):
i2 = i - (center_x - halfsize)
j2 = j - (center_y - halfsize)
image[j2][i2] = data_white[j - 1][i - 1]
return image
def get_mini_image(self, center, halfsize=15):
"""
:param center: tuple of coordinates, in pixels
:param size: length of the square around center
:return: ndarray which contain the image
"""
side = 2 * halfsize + 1
image = [[0 for x in range(side)] for y in range(side)]
data_white = fits.open(self.filename_white)[1].data
center_x = center[0]
center_y = center[1]
for i in xrange(center_x - halfsize - 1, center_x + halfsize):
for j in xrange(center_y - halfsize - 1, center_y + halfsize):
i2 = i - (center_x - halfsize)
j2 = j - (center_y - halfsize)
image[j2][i2] = data_white[j - 1][i - 1]
return image
def get_new_2dmask(self, region_string):
"""Creates a 2D mask for the white image that mask out spaxel that are outside
the region defined by region_string"""
from pyregion.region_to_filter import as_region_filter
im_aux = np.ones_like(self.white_data)
hdu_aux = fits.open(self.filename_white)[1]
hdu_aux.data = im_aux
hdulist = self.hdulist_white
r = pyregion.parse(region_string).as_imagecoord(hdulist[1].header)
shape = hdu_aux.data.shape
region_filter = as_region_filter(r, origin=0)
mask_new = region_filter.mask(shape)
mask_new_inverse = np.where(~mask_new, True, False)
self.draw_pyregion(region_string)
return mask_new_inverse
def get_template(redmonster_file, n_template): # n_template puede ser 1,2 o 3 o 4 o 5
hdulist = fits.open(redmonster_file)
templates = hdulist[2]
table = hdulist[1].data
Z = 'Z' + str(n_template)
Z_ERR = 'Z_ERR' + str(n_template)
MINRCHI2 = 'MINRCHI2' + str(n_template)
CLASS = 'CLASS' + str(n_template)
z_template = table[Z][0]
z_err_template = table[Z_ERR][0]
class_template = table[CLASS][0]
minchi_template = table[MINRCHI2][0]
index_template = n_template - 1
template = hdulist[2].data[0][index_template]
n = len(template)
COEFF0 = hdulist[0].header['COEFF0']
COEFF1 = hdulist[0].header['COEFF1']
NAXIS1 = n
WAVE_END = COEFF0 + (NAXIS1 - 1) * COEFF1
wave_log = np.linspace(COEFF0, WAVE_END, n)
wave = 10 ** wave_log
return wave, template, z_template, class_template, minchi_template, z_err_template
def BuildRegion(sL,Sources):
myreg=open(sL.regFile,'w')#note that this will overwrite previous region files of the same name
myreg.write('# Region File format: DS9 version ?')#I don't actually know, but I think it's one of the later ones, need to verify
myreg.write('\n# Created by make3FGLxml.py')
myreg.write('\nglobal font="roman 10 normal" move =0')
for k in Sources.keys():
src=Sources[k]
#get color based on if the source is free or not
color=('green' if src['free'] else 'magenta')
if src['E']:#if the source is extended, always have the point be a "big" box
myreg.write('\nJ2000;point(%.3f,%.3f) # point = box 18 color = %s text={%s}'%(src['ra'],src['dec'],color,k))
else:#if the source is a point source, choose the point type based on spectral model
ptype=('cross' if src['stype']=='PLSuperExpCutoff' else 'diamond' if src['stype']=='LogParabola' else 'circle')
myreg.write('\nJ2000;point(%.3f,%.3f) # point = %s 15 color = %s text={%s}'%(src['ra'],src['dec'],ptype,color,k))
myreg.close()
return
def writeConfig(conf):
'''
Write config to disk
@param conf: configparser.ConfigParser object
'''
config_location = os.path.join(os.path.expanduser('~'), '.skdaccess.conf')
config_handle = open(config_location, "w")
conf.write(config_handle)
config_handle.close()
def read_fits(fits_file,ext=0):
'''
Shortcut function to get the header and data from a fits file and a given
extension.
'''
hdulist = pyfits.open(fits_file)
img_header = hdulist[ext].header
img_data = hdulist[ext].data
hdulist.close()
return img_data, img_header
def read_fits_header(fits_file, ext=0):
'''
Shortcut function to just read the header of the FITS file and return it.
'''
hdulist = pyfits.open(fits_file)
img_header = hdulist[ext].header
hdulist.close()
return img_header
def kepler_lcdict_to_pkl(lcdict,
outfile=None):
'''This simply writes the lcdict to a pickle.
'''
if not outfile:
outfile = '%s-keplc.pkl' % lcdict['objectid'].replace(' ','-')
# we're using pickle.HIGHEST_PROTOCOL here, this will make Py3 pickles
# unreadable for Python 2.7
with open(outfile,'wb') as outfd:
pickle.dump(lcdict, outfd, protocol=pickle.HIGHEST_PROTOCOL)
return os.path.abspath(outfile)
def read_kepler_pklc(picklefile):
'''This turns the pickled lightcurve back into an lcdict.
'''
try:
with open(picklefile, 'rb') as infd:
lcdict = pickle.load(infd)
except UnicodeDecodeError:
with open(picklefile,'rb') as infd:
lcdict = pickle.load(infd, encoding='latin1')
LOGWARNING('pickle %s was probably from Python 2 '
'and failed to load without using "latin1" encoding. '
'This is probably a numpy issue: '
'http://stackoverflow.com/q/11305790' % checkplotpickle)
return lcdict
##########################
## KEPLER LC PROCESSING ##
##########################
def __init__(self, lcfile):
f = fits.open(lcfile)
self.lc_data = f[1].data
self.lc_header = f[1].header
self.time = self.lc_data['TIME']
self.rate = self.lc_data['RATE']
self.rate_err = self.lc_data['ERROR']
self.TSTART = self.lc_header['TSTART']
self.TSTOP = self.lc_header['TSTOP']
self.TIMEDEL = self.lc_header['TIMEDEL']
self.TIMEPIXR = self.lc_header['TIMEPIXR']
f.close()
def open_image(infile):
"""
Open the slice image and return its header and 2D image data.
NOTE
----
The input slice image may have following dimensions:
* NAXIS=2: [Y, X]
* NAXIS=3: [FREQ=1, Y, X]
* NAXIS=4: [STOKES=1, FREQ=1, Y, X]
NOTE
----
Only open slice image that has only ONE frequency and ONE Stokes
parameter.
Returns
-------
header : `~astropy.io.fits.Header`
image : 2D `~numpy.ndarray`
The 2D [Y, X] image part of the slice image.
"""
with fits.open(infile) as f:
header = f[0].header
data = f[0].data
if data.ndim == 2:
# NAXIS=2: [Y, X]
image = data
elif data.ndim == 3 and data.shape[0] == 1:
# NAXIS=3: [FREQ=1, Y, X]
image = data[0, :, :]
elif data.ndim == 4 and data.shape[0] == 1 and data.shape[1] == 1:
# NAXIS=4: [STOKES=1, FREQ=1, Y, X]
image = data[0, 0, :, :]
else:
raise ValueError("Slice '{0}' has invalid dimensions: {1}".format(
infile, data.shape))
return (header, image)