def _get_legendre_deg_ctd(npts):
from scipy.interpolate import interp1d
degs = nparray([4,5,6,10,15])
pts = nparray([1e2,3e2,5e2,1e3,3e3])
fn = interp1d(pts, degs, kind='linear',
bounds_error=False,
fill_value=(min(degs), max(degs)))
legendredeg = int(npfloor(fn(npts)))
return legendredeg
#######################################
# UTILITY FUNCTION FOR ANY DETRENDING #
#######################################
python类interp1d()的实例源码
def plot_info_retrieval(precisions, save_file):
# markers = ["|", "D", "8", "v", "^", ">", "h", "H", "s", "*", "p", "d", "<"]
markers = ["D", "p", 's', "*", "d", "8", "^", "H", "v", ">", "<", "h", "|"]
ticks = zip(*zip(*precisions)[1][0])[0]
plt.xticks(range(len(ticks)), ticks)
new_x = interpolate.interp1d(ticks, range(len(ticks)))(ticks)
i = 0
for model_name, val in precisions:
fr, pr = zip(*val)
plt.plot(new_x, pr, linestyle='-', alpha=0.7, marker=markers[i],
markersize=8, label=model_name)
i += 1
# plt.legend(model_name)
plt.xlabel('Fraction of Retrieved Documents')
plt.ylabel('Precision')
legend = plt.legend(loc='upper right', shadow=True)
plt.savefig(save_file)
plt.show()
def plot_info_retrieval_by_length(precisions, save_file):
markers = ["o", "v", "8", "s", "p", "*", "h", "H", "^", "x", "D"]
ticks = zip(*zip(*precisions)[1][0])[0]
plt.xticks(range(len(ticks)), ticks)
new_x = interpolate.interp1d(ticks, range(len(ticks)))(ticks)
i = 0
for model_name, val in precisions:
fr, pr = zip(*val)
plt.plot(new_x, pr, linestyle='-', alpha=0.6, marker=markers[i],
markersize=6, label=model_name)
i += 1
# plt.legend(model_name)
plt.xlabel('Document Sorted by Length')
plt.ylabel('Precision (%)')
legend = plt.legend(loc='upper right', shadow=True)
plt.savefig(save_file)
plt.show()
def riskfree():
r = requests.get(TREASURY_URL)
soup = BeautifulSoup(r.text, 'html.parser')
table = soup.find("table", attrs={'class' : 't-chart'})
rows= table.find_all('tr')
lastrow = len(rows)-1
cells = rows[lastrow].find_all("td")
date = cells[0].get_text()
m1 = float(cells[1].get_text())
m3 = float(cells[2].get_text())
m6 = float(cells[3].get_text())
y1 = float(cells[4].get_text())
y2 = float(cells[5].get_text())
y3 = float(cells[6].get_text())
y5 = float(cells[7].get_text())
y7 = float(cells[8].get_text())
y10 = float(cells[9].get_text())
y20 = float(cells[10].get_text())
y30 = float(cells[11].get_text())
years = (0, 1/12, 3/12, 6/12, 12/12, 24/12, 36/12, 60/12, 84/12, 120/12, 240/12, 360/12)
rates = (OVERNIGHT_RATE, m1/100, m3/100, m6/100, y1/100, y2/100, y3/100, y5/100, y7/100, y10/100, y20/100, y30/100)
return interp1d(years, rates)
def reSample( df , dt = None , xAxis = None , n = None , kind = 'linear') :
""" re-sample the signal """
if type(df) == pd.Series : df = pd.DataFrame(df)
f = interp1d( df.index, np.transpose(df.values) , kind=kind, axis=-1, copy=True, bounds_error=True, assume_sorted=True)
if dt :
end = int(+(df.index[-1] - df.index[0] ) / dt) * dt + df.index[0]
xAxis = np.linspace( df.index[0] , end , 1+int(+(end - df.index[0] ) / dt) )
elif n :
xAxis = np.linspace( df.index[0] , df.index[-1] , n )
elif xAxis == None :
raise(Exception("reSample : either dt or xAxis should be provided" ))
#For rounding issue, ensure that xAxis is within ts.xAxis
#xAxis[ np.where( xAxis > np.max(df.index[:]) ) ] = df.index[ np.where( xAxis > np.max(df.index[:]) ) ]
return pd.DataFrame( data = np.transpose(f(xAxis)), index = xAxis , columns = map( lambda x : "reSample("+ x +")" , df.columns ) )
def wavelength_to_XYZ(wavelength, observer='1931_2deg'):
''' Uses tristimulus color matching functions to map a awvelength to XYZ
coordinates.
Args:
wavelength (`float`): wavelength in nm.
observer (`str`): CIE observer name, must be 1931_2deg.
Returns:
`numpy.ndarray`: array with last dimension corresponding to X, Y, Z.
'''
wavelength = np.asarray(wavelength, dtype=config.precision)
cmf = get_cmf(observer)
wvl, X, Y, Z = cmf['wvl'], cmf['X'], cmf['Y'], cmf['Z']
ia = {'bounds_error': False, 'fill_value': 0, 'assume_sorted': True}
f_X, f_Y, f_Z = interp1d(wvl, X, **ia), interp1d(wvl, Y, **ia), interp1d(wvl, Z, **ia)
x, y, z = f_X(wavelength), f_Y(wavelength), f_Z(wavelength)
shape = wavelength.shape
return np.stack((x, y, z), axis=len(shape))
def make_quantile_df(data, draw_quantiles):
"""
Return a dataframe with info needed to draw quantile segments
"""
dens = data['density'].cumsum() / data['density'].sum()
ecdf = interp1d(dens, data['y'], assume_sorted=True)
ys = ecdf(draw_quantiles)
# Get the violin bounds for the requested quantiles
violin_xminvs = interp1d(data['y'], data['xminv'])(ys)
violin_xmaxvs = interp1d(data['y'], data['xmaxv'])(ys)
data = pd.DataFrame({
'x': interleave(violin_xminvs, violin_xmaxvs),
'y': np.repeat(ys, 2),
'group': np.repeat(np.arange(1, len(ys)+1), 2)})
return data
def compute_precision_score_mapping(thresh, prec, score):
ind = np.argsort(thresh); # thresh, ind = torch.sort(thresh)
thresh = thresh[ind];
indexes = np.unique(thresh, return_index=True)[1]
indexes = np.sort(indexes);
thresh = thresh[indexes]
thresh = np.vstack((min(-1000, min(thresh) - 1), thresh[:, np.newaxis], max(1000, max(thresh) + 1)));
prec = prec[ind];
for i in xrange(1, len(prec)):
prec[i] = max(prec[i], prec[i - 1]);
prec = prec[indexes]
prec = np.vstack((prec[0], prec[:, np.newaxis], prec[-1]));
f = interp1d(thresh[:, 0], prec[:, 0])
val = f(score)
return val
def __init__(self, streamlines, streamlineData):
self.streamlineCoordinates = streamlines
self.streamlineData = streamlineData
# Get the streamline parameter in a single array.
self.parameterisedStreamline = self.streamlineCoordinates[:, 0, 3]
# Calculate the velocity along the streamline
streamlineVelocity = np.linalg.norm(self.streamlineData[:, 2:4], axis=1)
# Fit a cubic spline to the streamline velocity
# Need this to calculate velocity derivatives
self.parameterisedVelocity = interpolate.CubicSpline(self.parameterisedStreamline, streamlineVelocity, extrapolate=1)
# Calculate the first derivative
self.parameterisedvelocityPrime = self.parameterisedVelocity.derivative(nu=1)
# Calculate the second derivative
self.parameterisedvelocityDoublePrime = self.parameterisedVelocity.derivative(nu=2)
# Parameterise temperature, pressure and density along the streamline
self.parameterisedM = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 0], kind='linear', fill_value='extrapolate')
self.parameterisedT = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 1], kind='linear', fill_value='extrapolate')
self.parameterisedP = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 5], kind='linear', fill_value='extrapolate')
self.parameterisedRho = interpolate.interp1d(self.parameterisedStreamline, self.streamlineData[:, 6], kind='linear', fill_value='extrapolate')
def test_megkde_2d_basic():
# Draw from normal, fit KDE, see if sampling from kde's pdf recovers norm
np.random.seed(1)
data = np.random.multivariate_normal([0, 1], [[1.0, 0.], [0., 0.75 ** 2]], size=10000)
xs, ys = np.linspace(-4, 4, 50), np.linspace(-4, 4, 50)
xx, yy = np.meshgrid(xs, ys, indexing='ij')
samps = np.vstack((xx.flatten(), yy.flatten())).T
zs = MegKDE(data).evaluate(samps).reshape(xx.shape)
zs_x = zs.sum(axis=1)
zs_y = zs.sum(axis=0)
cs_x = zs_x.cumsum()
cs_x /= cs_x[-1]
cs_x[0] = 0
cs_y = zs_y.cumsum()
cs_y /= cs_y[-1]
cs_y[0] = 0
samps_x = interp1d(cs_x, xs)(np.random.uniform(size=10000))
samps_y = interp1d(cs_y, ys)(np.random.uniform(size=10000))
mu_x, std_x = norm.fit(samps_x)
mu_y, std_y = norm.fit(samps_y)
assert np.isclose(mu_x, 0, atol=0.1)
assert np.isclose(std_x, 1.0, atol=0.1)
assert np.isclose(mu_y, 1, atol=0.1)
assert np.isclose(std_y, 0.75, atol=0.1)
def test_summary_max_shortest_2(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.6827
c.configure(statistics="max_shortest", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
x2 = interp1d(cdf, xs, bounds_error=False, fill_value=np.inf)(cdf + summary_area)
dist = x2 - xs
ind = np.argmin(dist)
x0 = xs[ind]
x2 = x2[ind]
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(x0, summary[0], atol=0.05)
assert np.isclose(x2, summary[2], atol=0.05)
def test_summary_max_central_2(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.6827
c.configure(statistics="max_central", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
xval = interp1d(cdf, xs)([0.5 - 0.5 * summary_area, 0.5 + 0.5 * summary_area])
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(xval[0], summary[0], atol=0.05)
assert np.isclose(xval[1], summary[2], atol=0.05)
def test_summary_max_central_3(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.95
c.configure(statistics="max_central", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
xval = interp1d(cdf, xs)([0.5 - 0.5 * summary_area, 0.5 + 0.5 * summary_area])
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(xval[0], summary[0], atol=0.05)
assert np.isclose(xval[1], summary[2], atol=0.05)
def get_parameter_summary_max_shortest(self, chain, parameter):
xs, ys, cs = self._get_smoothed_histogram(chain, parameter)
desired_area = chain.config["summary_area"]
c_to_x = interp1d(cs, xs, bounds_error=False, fill_value=(-np.inf, np.inf))
# Get max likelihood x
max_index = ys.argmax()
x = xs[max_index]
# Pair each lower bound with an upper to get the right area
x2 = c_to_x(cs + desired_area)
dists = x2 - xs
mask = (xs > x) | (x2 < x) # Ensure max point is inside the area
dists[mask] = np.inf
ind = dists.argmin()
return [xs[ind], x, x2[ind]]
def preprocess_Dr(data, r, normalize=True):
x = data["x"] #[xx[0] for xx in data["x"]]
data, x = fields._sorted(data, x)
Dt = [d[2][2] for d in data["D"]]
Dn = [d[0][0] for d in data["D"]]
eps = 1e-2
x = [-1., -eps, r] + x + [1000.]
if normalize:
Dn = [d/Dn[-1] for d in Dn]
Dt = [d/Dt[-1] for d in Dt]
Dn = [0., 0., eps] + Dn + [1.]
Dt = [0., 0., eps] + Dt + [1.]
fn = interp1d(x, Dn)
ft = interp1d(x, Dt)
return fn, ft
def _reinterpolate(tr, wave, new_wave):
'''
Reinterpolate a transmission curve on a fixed, regular wavelength grid
Parameters
----------
tr : array_like
Transmission, normalized to 1
wave : array_like
Wavelengths vector, in nanometers
new_wave : array_like
New wavelengths vector, in nanometers
Returns
-------
tr_regular : array_like
The reinterpolated transmission
'''
interp_fun = interp1d(wave, tr, bounds_error=False, fill_value=np.nan)
return interp_fun(new_wave)
def interpolateSolution(tvecData,tvecScaled,yvec):
"""Interpolates scaled simulation vector onto data time vector.
Args:
tvecData (numpy.ndarray): Data time vector.
tvecScaled (numpy.ndarray): Scaled simulation time vector.
yvec (numpy.ndarray): Simulation values.
Returns:
numpy.ndarray: Scaled simulation vector.
"""
fscal=interpolate.interp1d(tvecScaled,yvec)
yvecScaled=fscal(tvecData)
return yvecScaled
def _get_interp1d(self,* , kind='linear', copy=True, bounds_error=False,
fill_value=np.nan, assume_sorted=None):
"""returns a scipy interp1d object"""
if assume_sorted is None:
assume_sorted = utils.is_sorted(self.time)
if self.n_signals > 1:
axis = 1
else:
axis = -1
f = interpolate.interp1d(x=self.time,
y=self._ydata_rowsig,
kind=kind,
axis=axis,
copy=copy,
bounds_error=bounds_error,
fill_value=fill_value,
assume_sorted=assume_sorted)
return f
def cheaptrick_get_power_spectrum(waveform, fs, fft_size, f0):
power_spectrum = np.abs(np.fft.fft(waveform, fft_size)) ** 2
frequency_axis = np.arange(fft_size) / float(fft_size) * float(fs)
ind = frequency_axis < (f0 + fs / fft_size)
low_frequency_axis = frequency_axis[ind]
low_frequency_replica = interp1d(f0 - low_frequency_axis,
power_spectrum[ind], kind="linear",
fill_value="extrapolate")(low_frequency_axis)
p1 = low_frequency_replica[(frequency_axis < f0)[:len(low_frequency_replica)]]
p2 = power_spectrum[(frequency_axis < f0)[:len(power_spectrum)]]
power_spectrum[frequency_axis < f0] = p1 + p2
lb1 = int(fft_size / 2) + 1
lb2 = 1
ub2 = int(fft_size / 2)
power_spectrum[lb1:] = power_spectrum[lb2:ub2][::-1]
return power_spectrum
def world_synthesis_time_base_generation(temporal_positions, f0, fs, vuv,
time_axis, default_f0):
f0_interpolated_raw = interp1d(temporal_positions, f0, kind="linear",
fill_value="extrapolate")(time_axis)
vuv_interpolated = interp1d(temporal_positions, vuv, kind="linear",
fill_value="extrapolate")(time_axis)
vuv_interpolated = vuv_interpolated > 0.5
f0_interpolated = f0_interpolated_raw * vuv_interpolated.astype("float32")
f0_interpolated[f0_interpolated == 0] = f0_interpolated[f0_interpolated == 0] + default_f0
total_phase = np.cumsum(2 * np.pi * f0_interpolated / float(fs))
core = np.mod(total_phase, 2 * np.pi)
core = np.abs(core[1:] - core[:-1])
# account for diff, avoid deprecation warning with [:-1]
pulse_locations = time_axis[:-1][core > (np.pi / 2.)]
pulse_locations_index = np.round(pulse_locations * fs).astype("int32")
return pulse_locations, pulse_locations_index, vuv_interpolated
def cheaptrick_get_power_spectrum(waveform, fs, fft_size, f0):
power_spectrum = np.abs(np.fft.fft(waveform, fft_size)) ** 2
frequency_axis = np.arange(fft_size) / float(fft_size) * float(fs)
ind = frequency_axis < (f0 + fs / fft_size)
low_frequency_axis = frequency_axis[ind]
low_frequency_replica = interp1d(f0 - low_frequency_axis,
power_spectrum[ind], kind="linear",
fill_value="extrapolate")(low_frequency_axis)
p1 = low_frequency_replica[(frequency_axis < f0)[:len(low_frequency_replica)]]
p2 = power_spectrum[(frequency_axis < f0)[:len(power_spectrum)]]
power_spectrum[frequency_axis < f0] = p1 + p2
lb1 = int(fft_size / 2) + 1
lb2 = 1
ub2 = int(fft_size / 2)
power_spectrum[lb1:] = power_spectrum[lb2:ub2][::-1]
return power_spectrum
def world_synthesis_time_base_generation(temporal_positions, f0, fs, vuv,
time_axis, default_f0):
f0_interpolated_raw = interp1d(temporal_positions, f0, kind="linear",
fill_value="extrapolate")(time_axis)
vuv_interpolated = interp1d(temporal_positions, vuv, kind="linear",
fill_value="extrapolate")(time_axis)
vuv_interpolated = vuv_interpolated > 0.5
f0_interpolated = f0_interpolated_raw * vuv_interpolated.astype("float32")
f0_interpolated[f0_interpolated == 0] = f0_interpolated[f0_interpolated == 0] + default_f0
total_phase = np.cumsum(2 * np.pi * f0_interpolated / float(fs))
core = np.mod(total_phase, 2 * np.pi)
core = np.abs(core[1:] - core[:-1])
# account for diff, avoid deprecation warning with [:-1]
pulse_locations = time_axis[:-1][core > (np.pi / 2.)]
pulse_locations_index = np.round(pulse_locations * fs).astype("int32")
return pulse_locations, pulse_locations_index, vuv_interpolated
def from_array(cls, tenors, forwards, t0=None, interp_mode=InterpMode.PiecewiseConst):
"""Create a Curve object: tenors (floats), fwds (floats), interp_mode"""
if t0 is None:
t0 = tenors[0]
assert len(tenors) == len(forwards)
if interp_mode == cls.InterpMode.PiecewiseConst:
interpfwd = interp1d(tenors, forwards, kind='zero', bounds_error=False, fill_value='extrapolate')
return cls(t0, lambda t: interpfwd(t))
elif interp_mode == cls.InterpMode.Linear:
interpfwd = interp1d(tenors, forwards, kind='linear', bounds_error=False, fill_value='extrapolate')
return cls(t0, lambda t: interpfwd(t))
elif interp_mode == cls.InterpMode.LinearLog:
linzero = interp1d(tenors, np.log(forwards), kind='linear', bounds_error=False, fill_value='extrapolate')
return cls(t0, lambda t: np.exp(linzero(t)))
else:
raise BaseException('invalid curve interpolation mode ...')
def ioneq(self):
"""
Ionization equilibrium data interpolated to the given temperature
Note
----
Will return NaN where interpolation is out of range of the data. For computing
ionization equilibrium outside of this temperature range, it is better to use
`fiasco.Element.equilibrium_ionization`
"""
f = interp1d(self._ioneq[self._dset_names['ioneq_filename']]['temperature'],
self._ioneq[self._dset_names['ioneq_filename']]['ionization_fraction'],
kind='linear', bounds_error=False, fill_value=np.nan)
ioneq = f(self.temperature)
isfinite = np.isfinite(ioneq)
ioneq[isfinite] = np.where(ioneq[isfinite] < 0., 0., ioneq[isfinite])
return u.Quantity(ioneq)
def _dere_cross_section(self, energy: u.erg):
"""
Calculate direct ionization cross-sections according to [1]_.
References
----------
.. [1] Dere, K. P., 2007, A&A, `466, 771 <http://adsabs.harvard.edu/abs/2007A%26A...466..771D>`_
"""
# Cross-sections from diparams file
cross_section_total = np.zeros(energy.shape)
for ip,bt_c,bt_e,bt_cross_section in zip(self._diparams['ip'], self._diparams['bt_c'], self._diparams['bt_e'],
self._diparams['bt_cross_section']):
U = energy/(ip.to(u.erg))
scaled_energy = 1. - np.log(bt_c)/np.log(U - 1. + bt_c)
f_interp = interp1d(bt_e.value, bt_cross_section.value, kind='cubic', fill_value='extrapolate')
scaled_cross_section = f_interp(scaled_energy.value)*bt_cross_section.unit
# Only nonzero at energies above the ionization potential
scaled_cross_section *= (U.value > 1.)
cross_section = scaled_cross_section*(np.log(U) + 1.)/U/(ip**2)
if not hasattr(cross_section_total, 'unit'):
cross_section_total = cross_section_total*cross_section.unit
cross_section_total += cross_section
return cross_section_total
def __init__(self, hmat, m, n, tper, k, out0):
hmat = np.asarray(hmat)
if hmat.shape != (2 * m + 1, n + 1):
raise ValueError('hmat shape = %s not compatible with M=%d, N=%d' %
(hmat.shape, m, n))
# use symmetry to fill in negative input frequency data.
fullh = np.empty((2 * m + 1, 2 * n + 1), dtype=complex)
fullh[:, n:] = hmat / (k * tper)
fullh[:, :n] = np.fliplr(np.flipud(fullh[:, n + 1:])).conj()
self.hmat = fullh
wc = 2.0 * np.pi / tper
self.m_col = np.arange(-m, m + 1) * (1.0j * wc)
self.n_col = np.arange(-n, n + 1) * (1.0j * wc / k)
self.m_col = self.m_col.reshape((-1, 1))
self.n_col = self.n_col.reshape((-1, 1))
self.tper = tper
self.k = k
self.outfun = interp.interp1d(out0[:, 0], out0[:, 1], bounds_error=True,
assume_sorted=True)
def convolve(self, wavelengths, densities):
# define short names for the involved wavelength grids
wa = wavelengths
wb = self._Wavelengths
# create a combined wavelength grid, restricted to the overlapping interval
w1 = wa[ (wa>=wb[0]) & (wa<=wb[-1]) ]
w2 = wb[ (wb>=wa[0]) & (wb<=wa[-1]) ]
w = np.unique(np.hstack((w1,w2)))
if len(w) < 2: return 0
# log-log interpolate SED and transmission on the combined wavelength grid
# (use scipy interpolation function for SED because np.interp does not support broadcasting)
F = np.exp(interp1d(np.log(wa), _log(densities), copy=False, bounds_error=False, fill_value=0.)(np.log(w)))
T = np.exp(np.interp(np.log(w), np.log(wb), _log(self._Transmission), left=0., right=0.))
# perform the integration
if self._PhotonCounter:
return np.trapz(x=w, y=w*F*T) / self._IntegratedTransmission
else:
return np.trapz(x=w, y=F*T) / self._IntegratedTransmission
## This function calculates and returns the integrated value for a given spectral energy distribution over the
# filter's wavelength range,
def integrate(self, wavelengths, densities):
# define short names for the involved wavelength grids
wa = wavelengths
wb = self._Wavelengths
# create a combined wavelength grid, restricted to the overlapping interval
w1 = wa[(wa >= wb[0]) & (wa <= wb[-1])]
w2 = wb[(wb >= wa[0]) & (wb <= wa[-1])]
w = np.unique(np.hstack((w1, w2)))
if len(w) < 2: return 0
# log-log interpolate SED and transmission on the combined wavelength grid
# (use scipy interpolation function for SED because np.interp does not support broadcasting)
F = np.exp(interp1d(np.log(wa), _log(densities), copy=False, bounds_error=False, fill_value=0.)(np.log(w)))
T = np.exp(np.interp(np.log(w), np.log(wb), _log(self._Transmission), left=0., right=0.))
# perform the integration
if self._PhotonCounter: return np.trapz(x=w, y=w * F * T)
else: return np.trapz(x=w, y=F * T)
## This private helper function returns the natural logarithm for positive values, and a large negative number
# (but not infinity) for zero or negative values.
def findPercentile(parameter, probability, percentile):
npoints = 10000
interpfunc = interpolate.interp1d(parameter,probability, kind='linear')
parRange = np.linspace(min(parameter),max(parameter),npoints)
interProb = interpfunc(parRange)
cumInteg = np.zeros(npoints-1)
for i in range(1,npoints-1):
cumInteg[i] = cumInteg[i-1] + (0.5*(interProb[i+1] + interProb[i]) * (parRange[i+1] - parRange[i]))
cumInteg = cumInteg / cumInteg[-1]
idx = (np.abs(cumInteg-percentile/100.)).argmin()
return parRange[idx]
def convolveFilter(flux,wl,filter):
input = np.loadtxt(filter)
response_wl = input[:,0] * 1.e-4
response_trans = input[:,1]
minwl = response_wl[0]
maxwl = response_wl[len(response_wl)-1]
intwl = np.copy(wl)
intwl[intwl > maxwl] = -1
intwl[intwl < minwl] = -1
wlrange = intwl > 0
intwl = intwl[wlrange]
transmission = np.zeros(len(flux))
interpfunc = interpolate.interp1d(response_wl,response_trans, kind='linear')
transmission[wlrange] = interpfunc(intwl)
tot_trans = integrate.simps(transmission,wl)
tot_flux = integrate.simps(transmission*flux,wl)
return tot_flux/tot_trans