def integrate(self, wavelengths, densities):
# define short names for the involved wavelength grids
wa = wavelengths
wb = self._Wavelengths
# create a combined wavelength grid, restricted to the overlapping interval
w1 = wa[(wa >= wb[0]) & (wa <= wb[-1])]
w2 = wb[(wb >= wa[0]) & (wb <= wa[-1])]
w = np.unique(np.hstack((w1, w2)))
if len(w) < 2: return 0
# log-log interpolate SED and transmission on the combined wavelength grid
# (use scipy interpolation function for SED because np.interp does not support broadcasting)
F = np.exp(interp1d(np.log(wa), _log(densities), copy=False, bounds_error=False, fill_value=0.)(np.log(w)))
T = np.exp(np.interp(np.log(w), np.log(wb), _log(self._Transmission), left=0., right=0.))
# perform the integration
if self._PhotonCounter: return np.trapz(x=w, y=w * F * T)
else: return np.trapz(x=w, y=F * T)
## This private helper function returns the natural logarithm for positive values, and a large negative number
# (but not infinity) for zero or negative values.
python类interp1d()的实例源码
def __init__(self):
"""
This function ...
"""
# From Battisit et al. 2016
wl_B16 = np.arange(0.125, 0.832, 0.01)
x = 1. / wl_B16
Qfit_B16 = -2.488 + 1.803 * x - 0.261 * x ** 2 + 0.0145 * x ** 3
interpfunc = interpolate.interp1d(wl_B16, Qfit_B16, kind='linear')
Qfit_B16_V = interpfunc(0.55) # Interpolate to find attenuation at V band central wavelengths
n_atts_B16 = Qfit_B16 / Qfit_B16_V
wavelengths = wl_B16
attenuations = Qfit_B16 + 1. # TODO: understand this more ...
# Call the constructor of the base class
super(BattistiAttenuationCurve, self).__init__(wavelengths, attenuations)
# -----------------------------------------------------------------
def make_ecc_interpolant():
"""
Make interpolation function from eccentricity file to
determine number of harmonics to use for a given
eccentricity.
:returns: interpolant
"""
pth = resource_filename(Requirement.parse('libstempo'),
'libstempo/ecc_vs_nharm.txt')
fil = np.loadtxt(pth)
return interp1d(fil[:,0], fil[:,1])
# get interpolant for eccentric binaries
__init__.py 文件源码
项目:Physical_Simulation_Environment
作者: Telecominfraproject
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def interpolate_in_range(x, y, x_new, kind_interp):
""" Given some samples y of the function y(x), interpolate_in_range returns the interpolation of values y(x_new)
:param x: The points at which y(x) is evaluated. Array
:param y: The values of y(x). Array
:param x_new: The values at which y(x) has to be interpolated. Array
:param kind_interp: The interpolation method of the function scipy.interpolate.interp1d. String
:return: y_new: the new interpolates samples
"""
if x.size == 1:
y_new = y * np.ones(x_new.size)
elif x.size == 2:
x = np.append(x, x_new[-1])
y = np.append(y, y[-1])
func = interp.interp1d(x, y, kind=kind_interp, bounds_error=False)
y_new = func(x_new)
else:
func = interp.interp1d(x, y, kind=kind_interp, bounds_error=False)
y_new = func(x_new)
return y_new
def test_lund_rescale_mean_velocity_different_grid():
fileName = path.join(eddylicious.__path__[0], "..", "tests", "datasets",
"channel_flow_180", "dns.dat")
dns = np.genfromtxt(fileName)
eta = dns[:, 0]
yPlus = dns[:, 1]
u = dns[:, 1]
v = np.zeros(u.shape)
etaInfl = np.linspace(0, eta[-1], 119)
yPlusInfl = etaInfl*6.37309e-2/3.5e-4
uTest = interp1d(eta, u)(etaInfl)
w = blending_function(etaInfl)
uRescaled = lund_rescale_mean_velocity(eta, yPlus, u, v, etaInfl.size,
etaInfl, yPlusInfl, 1,
u[-1], u[-1], 1,
w)[0][:, 0]
for i in range(len(uTest)):
assert_almost_equal(uTest[i], uRescaled[i], decimal=3)
# Test recaling dns data onto itself, using a different grid with eta > 1
def linear(cls, time, y0, yf):
""" return linear function values that array size is same as time length
Args:
time (array_like) : time
y0 (float): initial value
yf (float): final value
Returns:
(N, ) ndarray
"""
x = np.array([time[0], time[-1]])
y = np.array([y0, yf])
f = interpolate.interp1d(x, y)
return f(time)
def integrate(self, wavelength_grid):
"""
Integrate the spectrum flux over the specified grid of wavelengths.
Parameters
----------
wavelength_grid : quantity_like
Returns
-------
integrated_flux : :class:`~astropy.units.Quantity`
"""
grid = u.Quantity(wavelength_grid)
grid = grid.to(self.wavelength.unit)
interpolator = interp1d(self.wavelength.value, self.flux.value,
kind='cubic')
new_flux = interpolator(grid.value)
return simps(new_flux, x=grid.value) * self.flux.unit * grid.unit
def get_results(self, time_steps, result_key="output", interpolation="nearest", as_eval_data=False):
"""
return results from internal storage for given time steps.
.. warning:: calling this method before a simulation was run will result in an error.
:param time_steps: time points where values are demanded
:param result_key: type of values to be returned
:param interpolation: interpolation method to use if demanded time-steps are not covered by the storage,
see :func:`scipy.interpolate.interp1d` for all possibilities
:param as_eval_data: return results as EvalData object for straightforward display
"""
func = interp1d(np.array(self._time_storage), np.array(self._value_storage[result_key]),
kind=interpolation, assume_sorted=True, axis=0)
values = func(time_steps)
if as_eval_data:
return EvalData([time_steps], values, name=".".join([self.name, result_key]))
return values
def __init__(self, survey, band):
self.survey = survey.strip()
self.band = band.strip()
try:
self.mstar_file = resource_filename(__name__,'data/mstar/mstar_%s_%s.fit' % (self.survey, self.band))
except:
raise IOError("Could not find mstar resource mstar_%s_%s.fit" % (self.survey, self.band))
try:
self._mstar_arr = fitsio.read(self.mstar_file,ext=1)
except:
raise IOError("Could not find mstar file mstar_%s_%s.fit" % (self.survey, self.band))
# Tom - why not use CubicSpline here? That's why it exists...
self._f = CubicSpline(self._mstar_arr['Z'],self._mstar_arr['MSTAR'])
#self._f = interpolate.interp1d(self._mstar_arr['Z'],self._mstar_arr['MSTAR'],kind='cubic')
def fit_transform(self, X, **kwargs):
"""Fit to data, then transform it.
Parameters
----------
X : array-like
Time series dataset to be resampled.
Returns
-------
numpy.ndarray
Resampled time series dataset.
"""
X_ = to_time_series_dataset(X)
n_ts, sz, d = X_.shape
equal_size = check_equal_size(X_)
X_out = numpy.empty((n_ts, self.sz_, d))
for i in range(X_.shape[0]):
xnew = numpy.linspace(0, 1, self.sz_)
if not equal_size:
sz = ts_size(X_[i])
for di in range(d):
f = interp1d(numpy.linspace(0, 1, sz), X_[i, :sz, di], kind="slinear")
X_out[i, :, di] = f(xnew)
return X_out
def get_xsfb(infile, m_in, unit):
if unit == 'pb': fac = 1000.
if unit == 'fb': fac = 1.
dirname = os.path.dirname(__file__)
infile_path = os.path.join(dirname, infile)
data = np.loadtxt(infile_path)
try:
mass_ar, xspb_ar = np.transpose(data)
except:
mass_ar, xspb_ar, dummy = np.transpose(data)
xspb_data = interpolate.interp1d(mass_ar, xspb_ar)
xspb = xspb_data(m_in)
xsfb = xspb * fac
return xsfb
def calculate_val(thresholds, embeddings1, embeddings2, actual_issame, far_target, nrof_folds=10):
assert(embeddings1.shape[0] == embeddings2.shape[0])
assert(embeddings1.shape[1] == embeddings2.shape[1])
nrof_pairs = min(len(actual_issame), embeddings1.shape[0])
nrof_thresholds = len(thresholds)
k_fold = KFold(n_splits=nrof_folds, shuffle=False)
val = np.zeros(nrof_folds)
far = np.zeros(nrof_folds)
diff = np.subtract(embeddings1, embeddings2)
dist = np.sum(np.square(diff),1)
indices = np.arange(nrof_pairs)
for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)):
# Find the threshold that gives FAR = far_target
far_train = np.zeros(nrof_thresholds)
for threshold_idx, threshold in enumerate(thresholds):
_, far_train[threshold_idx] = calculate_val_far(threshold, dist[train_set], actual_issame[train_set])
if np.max(far_train)>=far_target:
f = interpolate.interp1d(far_train, thresholds, kind='slinear')
threshold = f(far_target)
else:
threshold = 0.0
val[fold_idx], far[fold_idx] = calculate_val_far(threshold, dist[test_set], actual_issame[test_set])
val_mean = np.mean(val)
far_mean = np.mean(far)
val_std = np.std(val)
return val_mean, val_std, far_mean
def py_sampleProfile(intensity, Rmin, dR, nxy, dxy, udat, vdat, dRA=0., dDec=0., PA=0, inc=0.):
"""
Python implementation of sampleProfile.
"""
inc_cos = np.cos(inc)
nrad = len(intensity)
gridrad = np.linspace(Rmin, Rmin + dR * (nrad - 1), nrad)
ncol, nrow = nxy, nxy
# create the mesh grid
x = (np.linspace(0.5, -0.5 + 1./float(ncol), ncol)) * dxy * ncol
y = (np.linspace(0.5, -0.5 + 1./float(nrow), nrow)) * dxy * nrow
# we shrink the x axis, since PA is the angle East of North of the
# the plane of the disk (orthogonal to the angular momentum axis)
# PA=0 is a disk with vertical orbital node (aligned along North-South)
x_axis, y_axis = np.meshgrid(x / inc_cos, y)
x_meshgrid = np.sqrt(x_axis ** 2. + y_axis ** 2.)
# convert to Jansky
sr_to_px = dxy**2.
intensity *= sr_to_px
f = interp1d(gridrad, intensity, kind='linear', fill_value=0.,
bounds_error=False, assume_sorted=True)
intensmap = f(x_meshgrid)
f_center = interp1d(gridrad, intensity, kind='linear', fill_value='extrapolate',
bounds_error=False, assume_sorted=True)
intensmap[int(nrow/2), int(ncol/2)] = f_center(0.)
vis = py_sampleImage(intensmap, dxy, udat, vdat, PA=PA, dRA=dRA, dDec=dDec)
return vis
def pwl(pairs, t):
"""
PWL describes a piecewise linear waveform
:param pairs: a 2D list. Each rom is a pair [time, value]
:param t: array with times where the function has to be evaluated
:return: the function values at times defined in t
"""
# convert pairs to np.array is necessary
if isinstance(pairs, list):
pairs = np.array(pairs)
# check pairs format
if pairs.ndim == 1:
pairs.shape = (-1,2)
# get pairs
x = []
y = []
for xk, yk in pairs:
x.append(xk)
y.append(yk)
# create linear interpolator
fun = interp1d(x, y, bounds_error=False, fill_value=(y[0], y[-1]), assume_sorted=True)
# interpolation
out = fun(t)
return out
def plot(x, y, x_label, y_label, save_file):
ticks = x
plt.xticks(range(len(ticks)), ticks, fontsize = 15)
plt.yticks(fontsize = 15)
new_x = interpolate.interp1d(ticks, range(len(ticks)))(ticks)
plt.plot(new_x, y, linestyle='-', alpha=1.0, markersize=12, marker='p', color='b')
plt.xlabel(x_label, fontsize=24)
plt.ylabel(y_label, fontsize=20)
plt.savefig(save_file)
plt.show()
def Dc_to_redshift(self, Dc):
"""
Calculate the redshift corresponding to the given comoving distance
(along LoS) by using interpolation.
"""
if not hasattr(self, "_Dc_interp"):
Dc_min, Dc_max = self.Dc_limit
dDc = self.Dc_cell
N = int((Dc_max - Dc_min) / dDc)
z_ = np.linspace(self.zmin, self.zmax, num=N)
Dc_ = self.cosmo.comoving_distance(z_).value # [Mpc]
self._Dc_interp = interpolate.interp1d(Dc_, z_, kind="linear")
return self._Dc_interp(Dc)
def make2d(w1d, x=None):
"""
Create 2D filter from the 1D one.
Parameters
----------
w1d : 1D `~numpy.ndarray`
The input 1D filter/window
x : 1D `~numpy.ndarray`, optional
The X-axis values of the input ``w1d`` filter
Returns
-------
w2d : 2D `~numpy.ndarray`
Created 2D filter/window from the input 1D one.
Credit
------
[1] MATLAB - creating 2D convolution filters
https://cn.mathworks.com/matlabcentral/newsreader/view_thread/23588
"""
if x is None:
L = len(w1d)
M = (L-1) / 2
x = np.linspace(-M, M, num=L)
xmax = np.max(x)
xsize = int(2 * xmax + 1)
xx = np.linspace(-xmax, xmax, num=xsize)
xg, yg = np.meshgrid(xx, xx)
r = np.sqrt(xg**2 + yg**2)
ridx = (r <= xmax)
w2d = np.zeros(shape=(xsize, xsize))
finterp = interpolate.interp1d(x=x, y=w1d, kind="linear")
w2d[ridx] = finterp(r[ridx])
return w2d
def spectrum_to_XYZ_emissive(spectrum_dict, cmf='1931_2deg'):
''' Converts a reflective or transmissive spectrum to XYZ coordinates.
Args:
spectrum_dict (`dict`): dictionary with wvl, values keys. Wvl in units of nm.
cmf (`str`): which color matching function to use, defaults to
CIE 1931 2 degree observer.
Returns:
`tuple` containing:
`float`: X
`float`: Y
`float`: Z
'''
wvl, values = spectrum_dict['wvl'], spectrum_dict['values']
cmf = get_cmf(cmf)
wvl_cmf = cmf['wvl']
try:
can_be_direct = np.allclose(wvl_cmf, wvl)
except ValueError as e:
can_be_direct = False
if not can_be_direct:
dat_interpf = interp1d(wvl, values, kind='linear', bounds_error=False, fill_value=0, assume_sorted=True)
values = dat_interpf(wvl_cmf)
dw = wvl_cmf[1] - wvl_cmf[0]
k = 100 / (values * cmf['Y']).sum(axis=0) / dw
X = k * (values * cmf['X']).sum(axis=0)
Y = k * (values * cmf['Y']).sum(axis=0)
Z = k * (values * cmf['Z']).sum(axis=0)
return X, Y, Z
def add_demonstration(self, demonstration):
interpolate = interp1d(np.linspace(0, 1, len(demonstration)), demonstration, kind='cubic')
stretched_demo = interpolate(self.x)
self.Y = np.vstack((self.Y, stretched_demo))
self.nrTraj = len(self.Y)
self.W = np.dot(np.linalg.inv(np.dot(self.Phi, self.Phi.T)), np.dot(self.Phi, self.Y.T)).T # weights for each trajectory
self.meanW = np.mean(self.W, 0) # mean of weights
w1 = np.array(map(lambda x: x - self.meanW.T, self.W))
self.sigmaW = np.dot(w1.T, w1)/self.nrTraj # covariance of weights
self.sigmaSignal = np.sum(np.sum((np.dot(self.W, self.Phi) - self.Y) ** 2)) / (self.nrTraj * self.nrSamples)
def resamp(cop_dat):
# resample data to even sample points using same average sample rate
# resample data
t = np.linspace(cop_dat[0,2], cop_dat[-1,2], num=nsamp(cop_dat), endpoint=True)
f_cor = interp1d(cop_dat[:,2], cop_dat[:,0], kind='linear')
cor = f_cor(t)
f_sag = interp1d(cop_dat[:,2], cop_dat[:,1], kind='linear')
sag = f_sag(t)
# convert all to nd arrays
cor = to_sing(cor)
sag = to_sing(sag)
t = to_sing(t)
return np.concatenate((cor,sag,t),axis=1)