def integrate(self, wavelengths, densities):
# define short names for the involved wavelength grids
wa = wavelengths
wb = self._Wavelengths
# create a combined wavelength grid, restricted to the overlapping interval
w1 = wa[(wa >= wb[0]) & (wa <= wb[-1])]
w2 = wb[(wb >= wa[0]) & (wb <= wa[-1])]
w = np.unique(np.hstack((w1, w2)))
if len(w) < 2: return 0
# log-log interpolate SED and transmission on the combined wavelength grid
# (use scipy interpolation function for SED because np.interp does not support broadcasting)
F = np.exp(interp1d(np.log(wa), _log(densities), copy=False, bounds_error=False, fill_value=0.)(np.log(w)))
T = np.exp(np.interp(np.log(w), np.log(wb), _log(self._Transmission), left=0., right=0.))
# perform the integration
if self._PhotonCounter: return np.trapz(x=w, y=w * F * T)
else: return np.trapz(x=w, y=F * T)
## This private helper function returns the natural logarithm for positive values, and a large negative number
# (but not infinity) for zero or negative values.
python类interp()的实例源码
def calc_eff_area( self, v ) :
# Note. #nvn is a vector similar to inflow particle bulk
# velocity and ndir is the look direction.
# Normalize the particle velocity.
vn = calc_arr_norm( v )
nvn = tuple( [ -c for c in vn ] )
# Calculate the particle inflow angle (in degrees) relative to
# the cup normal (i.e., the cup pointing direction).
psi = acos( calc_arr_dot( self['dir'], nvn ) )*pi/180.
if ( psi > 90. ) :
return 0.
# Return the effective collecting area corresponding to "psi".
return interp( psi, self._spec._eff_deg, self._spec._eff_area )
#-----------------------------------------------------------------------
# DEFINE THE FUNCTION TO CALCULATE EXPECTED MAXWELLIAN CURRENT.
#-----------------------------------------------------------------------
def _set_interp_values(self):
"""
Use iteration-based interpolation to set values of some
schedule-based parameters.
"""
# Compute temporal interpolation value.
t = min((self.iteration_count + 1.0) /
(self._hyperparams['iterations'] - 1), 1)
# Perform iteration-based interpolation of entropy penalty.
if type(self._hyperparams['ent_reg_schedule']) in (int, float):
self.policy_opt.set_ent_reg(self._hyperparams['ent_reg_schedule'])
else:
sch = self._hyperparams['ent_reg_schedule']
self.policy_opt.set_ent_reg(
np.exp(np.interp(t, np.linspace(0, 1, num=len(sch)),
np.log(sch)))
)
# Perform iteration-based interpolation of Lagrange multiplier.
if type(self._hyperparams['lg_step_schedule']) in (int, float):
self._hyperparams['lg_step'] = self._hyperparams['lg_step_schedule']
else:
sch = self._hyperparams['lg_step_schedule']
self._hyperparams['lg_step'] = np.exp(
np.interp(t, np.linspace(0, 1, num=len(sch)), np.log(sch))
)
def _update(self):
if not self._sink:
return False
# Read and format data.
data, timestamps = self._sink.retrieveData(length=self._frameSize)
if not data:
return
sri = self._sink.sri
data = self._formatData(data, sri)
# If xdelta changes, update the X and Y ranges.
if self._sink.sriChanged():
# Update the X and Y ranges
x_min, x_max = self._getXRange(sri)
y_min, y_max = self._getYRange(sri)
self._image.set_extent((x_min, x_max, y_max, y_min))
# Preserve the aspect ratio based on the image size.
x_range = x_max - x_min
y_range = y_max - y_min
self._plot.set_aspect(x_range/y_range*self._aspect)
self._xdelta = sri.xdelta
# Resample data from frame size to image size.
height, width = self._imageData.shape
indices_out = numpy.linspace(0, len(data)-1, width)
indices_in = numpy.arange(len(data))
data = numpy.interp(indices_out, indices_in, data)
# Store the new row and update the image data.
self._imageData[self._row] = data
self._image.set_array(self._imageData)
# Advance row pointer
self._row = (self._row + 1) % height
return True
def _fourierTransform(self, x, y):
## Perform fourier transform. If x values are not sampled uniformly,
## then use np.interp to resample before taking fft.
dx = np.diff(x)
uniform = not np.any(np.abs(dx-dx[0]) > (abs(dx[0]) / 1000.))
if not uniform:
x2 = np.linspace(x[0], x[-1], len(x))
y = np.interp(x2, x, y)
x = x2
f = np.fft.fft(y) / len(y)
y = abs(f[1:len(f)/2])
dt = x[-1] - x[0]
x = np.linspace(0, 0.5*len(x)/dt, len(y))
return x, y
def _fourierTransform(self, x, y):
## Perform fourier transform. If x values are not sampled uniformly,
## then use np.interp to resample before taking fft.
dx = np.diff(x)
uniform = not np.any(np.abs(dx-dx[0]) > (abs(dx[0]) / 1000.))
if not uniform:
x2 = np.linspace(x[0], x[-1], len(x))
y = np.interp(x2, x, y)
x = x2
f = np.fft.fft(y) / len(y)
y = abs(f[1:len(f)/2])
dt = x[-1] - x[0]
x = np.linspace(0, 0.5*len(x)/dt, len(y))
return x, y
def makedistplots(ppdf1,pamt1,bincrates):
#### This is how we'll normalize to get changes per degree warming.
dry=ppdf1[0]*100 # Change in dry days
# % rain rates in mm/d for x axis ticks and labeling
otn=np.linspace(1,9,9)
xtickrates=np.append(0,otn*.1)
xtickrates=np.append(xtickrates,otn)
xtickrates=np.append(xtickrates,otn*10)
xtickrates=np.append(xtickrates,otn*100)
xticks=np.interp(xtickrates,bincrates,range(0,len(bincrates))); #% bin numbers associated with nice number rain rate
xticks,indices=np.unique(xticks,return_index=True)
xtickrates=xtickrates[indices]
### Bin width - needed to normalize the rain amount distribution
db=(bincrates[2]-bincrates[1])/bincrates[1];
### Now we plot
plt.figure(figsize=(4,6))
plt.clf()
ax=plt.subplot(211)
plt.plot(range(0,len(pamt1)),pamt1/db, 'k')
#plt.ylim((-.05,.15))
plt.xlim((4,130))
#plt.setp(ax,xticks=xticks,xticklabels=['0','0.1','','','','','','','','','','1','','','','','','','','','10','','','','','','','','','100','','','','','','','','','1000'])
plt.setp(ax,xticks=xticks,xticklabels=[''])
#plt.xlabel('Rain rate (mm/d)')
plt.title('Rain amount (mm/d)')
ax=plt.subplot(212)
plt.plot(range(0,len(ppdf1)),ppdf1*100, 'k')
plt.plot((0,len(ppdf1)),(0,0),'0.5')
plt.xlim((4,130))
### Annotate with the dry day frequency
ymin, ymax = ax.get_ylim()
t=plt.text(4,ymax*0.95, "{:.1f}".format(dry)+'%')
plt.setp(t,va='top',ha='left')
plt.setp(ax,xticks=xticks,xticklabels=['0','0.1','','','','','','','','','','1','','','','','','','','','10','','','','','','','','','100','','','','','','','','','1000'])
plt.xlabel('Rain rate (mm/d)')
plt.title('Rain frequency (%)')
plt.savefig("rainmetricdemo.pdf")
return
### Call the function to make the rain distribution
def plotROC(auc_list,fpr_list,tpr_list):
mean_tpr = 0.0
mean_fpr = np.linspace(0,1,100)
plt.figure(figsize=(5,5))
for i in range(len(fpr_list)):
mean_tpr += np.interp(mean_fpr, fpr_list[i], tpr_list[i])
mean_tpr[0] = 0.0
plt.plot(fpr_list[i], tpr_list[i], lw=1, label='ROC fold %d (area = %0.2f)' % (i, auc_list[i]))
plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Luck')
mean_tpr /= len(fpr_list)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--', label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('')
plt.legend(loc="lower right")
plt.show()
return mean_auc, mean_fpr, mean_tpr
def nan_interp(y, silent=False):
"""
Return a new array with nans found in *y* filled with linear
interpolants.
"""
# Source: http://stackoverflow.com/questions/6518811/interpolate-nan-values-in-a-numpy-array
nans, x = nan_helper(y)
z = NP.array(y)
if not silent and sum(nans) > 0:
logger.warning('linear interpolation over {} NaN values'.format(sum(nans)))
z[nans]= NP.interp(x(nans), x(~nans), z[~nans])
return z
def get_satellite_coords(self, times):
"""
Calculate the coordinates of the satellite for given times
using interpolation.
Parameters :
times: *np.ndarray* or *list of floats*
Epochs for which satellite coordinates will be calculated.
Returns :
satellite_skycoord: *Astropy.coordinates.SkyCord*
*SkyCord* for satellite at epochs *times*.
"""
if self._horizons is None:
self._horizons = Horizons(self.ephemerides_file)
# We should add some check if all values in times as within range
# covered by self._horizons.time.
x = np.interp(
times, self._horizons.time, self._horizons.xyz.x)
y = np.interp(
times, self._horizons.time, self._horizons.xyz.y)
z = np.interp(
times, self._horizons.time, self._horizons.xyz.z)
self._satellite_skycoord = SkyCoord(
x=x, y=y, z=z, representation='cartesian')
self._satellite_skycoord.representation = 'spherical'
return self._satellite_skycoord
def apply_latitude_adjustments(pixels):
data, (bounds, crs), _ = pixels
(_, height, width) = data.shape
ys = np.interp(np.arange(height), [0, height - 1], [bounds[3], bounds[1]])
xs = np.empty_like(ys)
xs.fill(bounds[0])
longitudes, latitudes = warp.transform(crs, WGS84_CRS, xs, ys)
factors = 1 / np.cos(np.radians(latitudes))
# convert to 2d array, rotate 270º, scale data
return PixelCollection(data * np.rot90(np.atleast_2d(factors), 3),
pixels.bounds)
def nlfunc(r,sc,grid,gg,return_gradient=True):
'returns xhat_nl = rhat_nl * interp( rhat_nl / sc,grid,gg) and optionally the gradient of xhat_nl wrt rhat_nl'
g = r * np.interp(r/sc,grid,gg)
if return_gradient:
#I had some code that computed the gradient, but it was far more complicated and no faster than just computing the empirical gradient
# technically, this computes a subgradient
dr = sc * (grid[1]-grid[0]) * 1e-3
dgdr = (nlfunc(r+.5*dr,sc,grid,gg,False) - nlfunc(r-.5*dr,sc,grid,gg,False)) / dr
return (g,dgdr)
else:
return g
def nlfunc_(r_,sc_,grid,gg_,return_gradient=True):
'returns xhat_nl = rhat_nl * interp( rhat_nl / sc,grid,gg) and optionally the gradient of xhat_nl wrt rhat_nl'
g_ = r_ * interp1d_(r_/sc_,grid,gg_)
if return_gradient:
#I had some code that computed the gradient, but it was far more complicated and no faster than just computing the empirical gradient
# technically, this computes a subgradient
dr_ = sc_ * (grid[1]-grid[0]) * 1e-3
dgdr_ = (nlfunc_(r_+.5*dr_,sc_,grid,gg_,False) - nlfunc_(r_-.5*dr_,sc_,grid,gg_,False)) / dr_
return (g_,dgdr_)
else:
return g_
def interpolate_to_std_domain(wav_rest, fwav):
stepsize = 1.03 # similar to the native step size
data_range = [4686 - 150, 4686 + 150]
steps = range(int((data_range[1] - data_range[0]) / stepsize) + 1)
wav_rest_standard = [i * stepsize + data_range[0] for i in steps]
fwav_interp = np.interp(wav_rest_standard, wav_rest, fwav)
wav_rest = wav_rest_standard
fwav = fwav_interp
return wav_rest, fwav
def standardize_domain(wls, fxs, wl_min, wl_max, n_samples):
new_wls = [lerp(wl_min, wl_max, i/(n_samples-1)) for i in range(n_samples)]
new_fxs = np.interp(new_wls, wls, fxs)
return new_wls, new_fxs
def process_fits(file_path, wls_out):
print('processing ' + file_path)
hdulist = fits.open(file_path)
wls = 10 ** hdulist[1].data['loglam']
fxs = hdulist[1].data['flux']
z = hdulist[2].data['z']
wls = wls / (1 + z)
if wls_out[0] < wls[0] or wls_out[-1] > wls[-1]:
return None
remove_slope(wls, fxs)
wls, fxs = gaussian_smooth(wls, fxs)
wls, fxs = crop_data(wls, fxs, wls_out)
fxs_out = numpy.interp(wls_out, wls, fxs)
return fxs_out
def _prepData(dataPath, numTimesteps, normalizeData=False):
"""
Get and preprocess the ground truth drive speeds data.
Args:
dataPath: (str) Path to video file.
numTimesteps: (int) Number of timesteps to interpolate the data to.
normalizeData: (bool) Normalize the data to [0,1].
Returns:
(list) Speeds, one for each timestep.
"""
with open(dataPath, "rb") as infile:
driveData = json.load(infile)
# Prep data: make sure it's in order, and use relative position (b/c seconds
# values may be incorrect).
driveData.sort(key = lambda x: x[0])
dataSpeeds = np.array([d[1] for d in driveData])
dataTimes = np.array([d[0] for d in driveData])
dataPositions = ( (dataTimes - dataTimes.min()) /
(dataTimes.max() - dataTimes.min()) )
if normalizeData:
dataSpeeds = normalize(dataSpeeds)
# Linearly interpolate data to the number of video frames.
return np.interp(np.arange(0.0, 1.0, 1.0 / numTimesteps),
dataPositions,
dataSpeeds)
def prepTruthData(dataPath, numFrames, normalizeData=False):
"""
Get and preprocess the ground truth drive speeds data.
Args:
dataPath: (str) Path to JSON of ground truths.
numFrames: (int) Number of timesteps to interpolate the data to.
normalizeData: (bool) Normalize the data to [0,1].
Returns:
(list) Linearly interpolated truth values, one for each timestep.
"""
with open(dataPath, "rb") as infile:
driveData = json.load(infile)
# Prep data: make sure it's in order, and use relative position (b/c seconds
# values may be incorrect)
driveData.sort(key = lambda x: x[0])
times = np.zeros(len(driveData))
speeds = np.zeros(len(driveData))
for i, (time, speed) in enumerate(driveData):
times[i] = time
speeds[i] = speed
positions = (times - times.min()) / (times.max() - times.min())
if normalizeData:
speeds = normalize(speeds)
# Linearly interpolate the data to the number of video frames
return np.interp(np.arange(0.0, 1.0, 1.0/numFrames), positions, speeds)
def test_exceptions(self):
assert_raises(ValueError, interp, 0, [], [])
assert_raises(ValueError, interp, 0, [0], [1, 2])
assert_raises(ValueError, interp, 0, [0, 1], [1, 2], period=0)
assert_raises(ValueError, interp, 0, [], [], period=360)
assert_raises(ValueError, interp, 0, [0], [1, 2], period=360)
def test_basic(self):
x = np.linspace(0, 1, 5)
y = np.linspace(0, 1, 5)
x0 = np.linspace(0, 1, 50)
assert_almost_equal(np.interp(x0, x, y), x0)