def resample(series, *, factor=10, size=None):
"""
Returns a new series re-sampled to a given number of points.
:param series:
:param factor: a number of points per unit time to scale the series to.
:param size: a number of points to scale the series to.
:return:
"""
series = series.dropna()
start, end = series.index[0], series.index[-1]
if size is None:
size = (end - start) * factor
index = numpy.linspace(start, end, size)
spline = InterpolatedUnivariateSpline(series.index, series.values)
return pandas.Series(index=index, data=spline(index))
python类InterpolatedUnivariateSpline()的实例源码
def test_call(self):
"""Test of the function call
"""
models = {'simp': SimpleModel([2, 1])}
data = self.simSimp(models)
self.assertEqual(len(data), 3)
self.assertIsInstance(data[0], np.ndarray)
self.assertEqual(len(data[1]), 1)
self.assertIsInstance(data[1][0], np.ndarray)
self.assertIsInstance(data[2], dict)
self.assertTrue('mean_fn' in data[2])
self.assertIsInstance(data[2]['mean_fn'], IUSpline)
xx = np.arange(10)
npt.assert_equal(data[0], xx)
npt.assert_equal(data[1][0], (2 * xx)**2 + 1 * xx)
def read_slope_from_file(self, filename=None):
if filename == None:
filename = '../MaunaUlu74/DEM_maunaulu74.txt'
distance = []
slope = []
# here read the slope file (.txt) where each line represent the distance from the vent (first column) and
# the corresponding slope in degree (second column) that is then converted in gradiant
f_slope = open(filename, "r")
f_slope.readline()
for line in f_slope:
split_line = line.strip('\n').split('\t')
distance.append(float(split_line[0]))
slope.append(math.radians(float(split_line[1])))
f_slope.close()
#slope = self.running_mean(slope, 15)
# build the spline to interpolate the distance (k=1 : it is a linear interpolation)
self._slope_spline = interpolate.InterpolatedUnivariateSpline(distance, slope, k=1.)
def __init__(self, scale_list, values, method='spline', extrapolate=False):
# error checking
if len(values.shape) != 1:
raise ValueError('This class only works for 1D data.')
elif len(scale_list) != 1:
raise ValueError('input and output dimension mismatch.')
if method == 'linear':
k = 1
elif method == 'spline':
k = 3
else:
raise ValueError('Unsuppoorted interpolation method: %s' % method)
offset, scale = scale_list[0]
num_pts = values.shape[0]
points = np.linspace(offset, (num_pts - 1) * scale + offset, num_pts) # type: np.multiarray.ndarray
DiffFunction.__init__(self, [(points[0], points[-1])], delta_list=None)
ext = 0 if extrapolate else 2
self.fun = interp.InterpolatedUnivariateSpline(points, values, k=k, ext=ext)
def smooth_log(self):
"""
This function ...
:return:
"""
centers = np.array(self.centers)
counts = np.array(self.counts)
not_zero = counts != 0
centers = centers[not_zero]
counts = counts[not_zero]
order = 2
s = InterpolatedUnivariateSpline(centers, np.log10(counts), k=order)
# Return the spline curve
return s
# -----------------------------------------------------------------
def smooth_log(self):
"""
This function ...
:return:
"""
centers = np.array(self.centers)
counts = np.array(self.counts)
not_zero = counts != 0
centers = centers[not_zero]
counts = counts[not_zero]
order = 2
s = InterpolatedUnivariateSpline(centers, np.log10(counts), k=order)
# Return the spline curve
return s
# -----------------------------------------------------------------
def set_table(self, n=100, dx=None):
r'''Method to set an interpolation table of liquids levels versus
volumes in the tank, for a fully defined tank. Normally run by the
h_from_V method, this may be run prior to its use with a custom
specification. Either the number of points on the table, or the
vertical distance between steps may be specified.
Parameters
----------
n : float, optional
Number of points in the interpolation table, [-]
dx : float, optional
Vertical distance between steps in the interpolation table, [m]
'''
if dx:
self.heights = np.linspace(0, self.h_max, int(self.h_max/dx)+1)
else:
self.heights = np.linspace(0, self.h_max, n)
self.volumes = [self.V_from_h(h) for h in self.heights]
self.interp_h_from_V = InterpolatedUnivariateSpline(self.volumes, self.heights, ext=3)
self.table = True
def segment_spline_smoothing(series, series_std_dev=None):
if series_std_dev is None:
series_std_dev = series
segments = segment_by_std_dev(series_std_dev)
points = segment_points(series, segments)
spline = InterpolatedUnivariateSpline(points.index, points.values, k=3)
return pandas.Series(data=spline(series.index), index=series.index)
def get_splines(self, x_data, y_data, var_data=0):
"""Overloads the base spline generateion which cannot deal with the
smooth experiment
"""
return IUSpline(x_data, y_data), None
def _on_call(self, models):
"""Dummy simulation
"""
sim_model = models
x_list = np.arange(self.get_option('nDOF'))
mean_fn = IUSpline(x_list, np.array(sim_model(x_list)))
return x_list, [np.array(sim_model(x_list))],\
{'mean_fn': mean_fn, 'tau': 0}
def _on_call(self, model1, model2):
"""Dummy simulation
"""
x_list = np.arange(self.get_option('nDOF'))
values = np.array(model1(x_list) + model2(x_list))
mean_fn = IUSpline(x_list, values)
return x_list, [values,],\
{'mean_fn': mean_fn, 'tau': 0}
def univariate_plot(lx=[],ly=[]):
unew = np.arange(lx[0], lx[-1]+1, 1)
f2 = InterpolatedUnivariateSpline(lx, ly)
return unew,f2(unew)
flowgo_crystallization_rate_model_melts.py 文件源码
项目:pyflowgo
作者: pyflowgo
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def read_crystal_from_melts(self, filename=None):
crystal_fraction = []
temperature = []
if filename == None:
filename = '../pyflowgo/MaunaUlu74/Results-melts_MU74.csv'
with open(filename) as csvfile:
f_crystal_fraction = csv.DictReader(csvfile, delimiter=',')
for row in f_crystal_fraction:
temperature.append(float(row['temperature'])+273.15)
crystal_fraction.append(float(row['fraq_microlite']))
self._crystal_spline = interpolate.InterpolatedUnivariateSpline(temperature, crystal_fraction, k=1.)
def noisefilter(t, signal, avgpts=30, smoothfac=1600):
smooth = rolling_mean(signal[::-1], avgpts)[::-1]
fspline = InterpolatedUnivariateSpline(t[:-avgpts], smooth[:-avgpts], k=4)
fspline.set_smoothing_factor(smoothfac)
return fspline(t)
def spline_fitting(x_data, y_data, order):
return InterpolatedUnivariateSpline(x_data, y_data, k=order)
def __init__(self, xvec, yvec, xtol, order=3, ext=3):
self._xvec = xvec
self._yvec = yvec
self._xtol = xtol
self._order = order
self._ext = ext
self._fun = interp.InterpolatedUnivariateSpline(xvec, yvec, k=order, ext=ext)
def ext(self):
"""interpolation extension mode. See documentation for InterpolatedUnivariateSpline."""
return self._ext
def smooth(self):
"""
This function ...
:return:
"""
order = 2
s = InterpolatedUnivariateSpline(self.centers, self.counts, k=order)
# Return the spline curve
return s
# -----------------------------------------------------------------
def smooth(self):
"""
This function ...
:return:
"""
order = 2
s = InterpolatedUnivariateSpline(self.centers, self.counts, k=order)
# Return the spline curve
return s
# -----------------------------------------------------------------
def get_interpolator(self, xcolumn, ycolumn, extrap='nearest'):
"""
Get an interpolator instance between the two columns
Parameters:
===========
- xcolumn: string
The name of the x column to interpolate between
- ycolumn: string
The name of the value you want to interpolate
- extrap: string
How to treat extrapolation. Options are:
1. 'nearest': Default behavior. It will return the nearest match
to the given 'x' value
2. 'extrapolate': Extrapolate the spline. This is probably only
safe for very small extrapolations
Returns:
========
A callable interpolator.
"""
# Make sure the column names are correct
assert xcolumn in self.mam_df.keys() and ycolumn in self.mam_df.keys()
# Sort the dataframe by the x column, and drop any duplicates or nans it might have
sorted_df = self.mam_df.sort_values(by=xcolumn).dropna(subset=[xcolumn, ycolumn], how='any').drop_duplicates(xcolumn)
# Make an interpolator
ext_value = {'nearest': 3, 'extrapolate': 0}
fcn = spline(sorted_df[xcolumn].values, sorted_df[ycolumn].values, ext=ext_value[extrap])
return fcn
def get_ccf_data(basedir, primary_name=None, secondary_name=None, vel_arr=np.arange(-900.0, 900.0, 0.1), type='bright'):
"""
Searches the given directory for CCF files, and classifies
by star, temperature, metallicity, and vsini
:param basedir: The directory to search for CCF files
:keyword primary_name: Optional keyword. If given, it will only get the requested primary star data
:keyword secondary_name: Same as primary_name, but only reads ccfs for the given secondary
:keyword vel_arr: The velocities to interpolate each ccf at
:return: pandas DataFrame
"""
if not basedir.endswith('/'):
basedir += '/'
all_files = ['{}{}'.format(basedir, f) for f in os.listdir(basedir) if type in f.lower()]
primary = []
secondary = []
vsini_values = []
temperature = []
gravity = []
metallicity = []
ccf = []
for fname in all_files:
star1, star2, vsini, temp, logg, metal = classify_filename(fname, type=type)
if primary_name is not None and star1.lower() != primary_name.lower():
continue
if secondary_name is not None and star2.lower() != secondary_name.lower():
continue
vel, corr = np.loadtxt(fname, unpack=True)
fcn = spline(vel, corr)
ccf.append(fcn(vel_arr))
primary.append(star1)
secondary.append(star2)
vsini_values.append(vsini)
temperature.append(temp)
gravity.append(logg)
metallicity.append(metal)
# Make a pandas dataframe with all this data
df = pd.DataFrame(data={'Primary': primary, 'Secondary': secondary, 'Temperature': temperature,
'vsini': vsini_values, 'logg': gravity, '[Fe/H]': metallicity, 'CCF': ccf})
return df
def __init__(self, filt, T_low=3500, T_high=12000, dT=10, feh=0.0):
"""
Initialize a CompanionFitter instance. It will pre-tabulate
synthetic photometry for main-sequence stars with Temperatures
ranging from T_low to T_high, in steps of dT K. All the
models will have [Fe/H] = feh. Finally, we will interpolate
the relationship between temperature and magnitude so that
additional photometry points are made quickly.
Parameters:
===========
- filt: A pysynphot bandpass encoding the filter information.
- T_low, T_high, dT: floats
Parameters describing the temperature grid
to interpolate
-feh: float
The metallicity [Fe/H] to use for the models
"""
# Use the Mamajek table to get main-sequence relationships
MT = Mamajek_Table.MamajekTable()
MT.mam_df['radius'] = 10**(0.5*MT.mam_df.logL - 2.0*MT.mam_df.logT + 2.0*3.762)
MT.mam_df['logg'] = 4.44 + np.log10(MT.mam_df.Msun) - 2.0*np.log10(MT.mam_df.radius)
teff2radius = MT.get_interpolator('Teff', 'radius')
teff2logg = MT.get_interpolator('Teff', 'logg')
# Pre-calculate the magnitude at each temperature
self.temperature = np.arange(T_low, T_high, dT)
self.magnitude = np.zeros(self.temperature.size)
for i, T in enumerate(self.temperature):
logging.info('i = {}/{}: T = {:.1f}'.format(i+1, self.temperature.size, T))
logg = teff2logg(T)
R = teff2radius(T)
spec = pysynphot.Icat('ck04models', T, feh, logg) * R**2
obs = pysynphot.Observation(spec, filt)
self.magnitude[i] = obs.effstim('abmag')
# Interpolate the T-mag curve
self.interpolator = spline(self.temperature, self.magnitude)
def __call__(self, T):
"""
Evaluate the spline at the given temperature, returning the interpolated magnitude
"""
return self.interpolator(T)
def undrift(locs, info, segmentation, display=True, segmentation_callback=None, rcc_callback=None):
bounds, segments = _render.segment(locs, info, segmentation,
{'blur_method': 'gaussian', 'min_blur_width': 1},
segmentation_callback)
shift_y, shift_x = _imageprocess.rcc(segments, 32, rcc_callback)
t = (bounds[1:] + bounds[:-1]) / 2
drift_x_pol = _interpolate.InterpolatedUnivariateSpline(t, shift_x, k=3)
drift_y_pol = _interpolate.InterpolatedUnivariateSpline(t, shift_y, k=3)
t_inter = _np.arange(info[0]['Frames'])
drift = (drift_x_pol(t_inter), drift_y_pol(t_inter))
drift = _np.rec.array(drift, dtype=[('x', 'f'), ('y', 'f')])
if display:
fig1 = _plt.figure(figsize=(17, 6))
_plt.suptitle('Estimated drift')
_plt.subplot(1, 2, 1)
_plt.plot(drift.x, label='x interpolated')
_plt.plot(drift.y, label='y interpolated')
t = (bounds[1:] + bounds[:-1]) / 2
_plt.plot(t, shift_x, 'o', color=list(_plt.rcParams['axes.prop_cycle'])[0]['color'], label='x')
_plt.plot(t, shift_y, 'o', color=list(_plt.rcParams['axes.prop_cycle'])[1]['color'], label='y')
_plt.legend(loc='best')
_plt.xlabel('Frame')
_plt.ylabel('Drift (pixel)')
_plt.subplot(1, 2, 2)
_plt.plot(drift.x, drift.y, color=list(_plt.rcParams['axes.prop_cycle'])[2]['color'])
_plt.plot(shift_x, shift_y, 'o', color=list(_plt.rcParams['axes.prop_cycle'])[2]['color'])
_plt.axis('equal')
_plt.xlabel('x')
_plt.ylabel('y')
fig1.show()
locs.x -= drift.x[locs.frame]
locs.y -= drift.y[locs.frame]
return drift, locs
def _make_redshift_spline(z_min, z_max):
"""Utility function for creating a spline for comoving distance to
redshift.
"""
redshift_array = np.linspace(
np.min((z_min - 1e-8, 0.0)), z_max + 1e-8, 1000)
comov_array = core_utils.WMAP5.comoving_distance(redshift_array)
comov_dist_to_redshift_spline = iu_spline(comov_array, redshift_array)
return comov_dist_to_redshift_spline
def fft(fEM, time, freq, ftarg):
"""Fourier Transform using the Fast Fourier Transform.
The function is called from one of the modelling routines in :mod:`model`.
Consult these modelling routines for a description of the input and output
parameters.
Returns
-------
tEM : array
Returns time-domain EM response of `fEM` for given `time`.
conv : bool
Only relevant for QWE/QUAD.
"""
# Get ftarg values
dfreq, nfreq, ntot, pts_per_dec = ftarg
# If pts_per_dec, we have first to interpolate fEM to required freqs
if pts_per_dec:
sfEMr = iuSpline(np.log10(freq), fEM.real)
sfEMi = iuSpline(np.log10(freq), fEM.imag)
freq = np.arange(1, nfreq+1)*dfreq
fEM = sfEMr(np.log10(freq)) + 1j*sfEMi(np.log10(freq))
# Pad the frequency result
fEM = np.pad(fEM, (0, ntot-nfreq), 'linear_ramp')
# Carry out FFT
ifftEM = fftpack.ifft(np.r_[fEM[1:], 0, fEM[::-1].conj()]).real
stEM = 2*ntot*fftpack.fftshift(ifftEM*dfreq, 0)
# Interpolate in time domain
dt = 1/(2*ntot*dfreq)
ifEM = iuSpline(np.linspace(-ntot, ntot-1, 2*ntot)*dt, stEM)
tEM = ifEM(time)/2*np.pi # (Multiplication of 2/pi in model.tem)
# Return the electromagnetic time domain field
# (Second argument is only for QWE)
return tEM, True
# 3. Utilities
def __call__(self, *args, **kwargs):
# TODO: Should be more defensive about this
if len(args) == 1:
# We have a dataframe
df = args[0]
else:
# We have parameters for a single invocation
df = pd.DataFrame(kwargs)
if self.key_columns:
sub_tables = df.groupby(self.key_columns)
else:
sub_tables = [(None, df)]
result = pd.DataFrame(index=df.index)
for key, sub_table in sub_tables:
if sub_table.empty:
continue
funcs = self.interpolations[key]
parameters = tuple(sub_table[k] for k in self.parameter_columns)
for value_column, func in funcs.items():
out = func(*parameters)
# This reshape is necessary because RectBivariateSpline and InterpolatedUnivariateSpline return results
# in slightly different shapes and we need them to be consistent
if out.shape:
result.loc[sub_table.index, value_column] = out.reshape((out.shape[0],))
else:
result.loc[sub_table.index, value_column] = out
if self.func:
return self.func(result)
if len(result.columns) == 1:
return result[result.columns[0]]
return result
def __get_1d_function(x, y, kind):
"""
Train 1-D interpolator
:param x: x-coordinates of data points
:param y: y-coordinates of data points
:param kind: 'linear' or 'spline' interpolation type
:return Interpolator callable object
"""
def fun_interp(t):
return np.interp(t, x, y)
# input consistency check
if len(x) != len(y):
logger.error("len(x) = {:d}, len(y) = {:d}. Both should be equal".format(len(x), len(y)))
raise Exception("1-D interpolation: X and Y should have the same shape")
# 1-element data set, return fixed value
if len(y) == 1:
# define constant
def fun_const(t):
"""
Helper function
:param t: array-like or scalar
:return: array of constant values if t is an array of constant scalar if t is a scalar
"""
try:
result = np.ones_like(t) * y[0] # t is an array
except TypeError:
result = y[0] # t is a scalar
return result
result = fun_const
# 2-element data set, use linear interpolation from numpy
elif len(y) == 2:
result = fun_interp
else:
if kind == 'spline':
# 3-rd degree spline interpolation, passing through all points
try:
from scipy.interpolate import InterpolatedUnivariateSpline
except ImportError as e:
logger.error("Please install scipy on your platform to be able to use spline-based interpolation")
raise e
k = 3
if len(y) == 3: # fall back to 2-nd degree spline if only 3 points are present
k = 2
result = InterpolatedUnivariateSpline(x, y, k=k)
elif kind == 'linear':
result = fun_interp
else:
raise ("Unsupported interpolation type {:s}.".format(kind))
return result
def get_rv(vel, corr, Npix=None, **fit_kws):
"""
Get the best radial velocity, with errors.
This will only work if the ccf was made with the maximum likelihood method!
Uses the formula given in Zucker (2003) MNRAS, 342, 4 for the rv error.
Parameters:
===========
- vel: numpy.ndarray
The velocities
- corr: numpy.ndarray
The ccf values. Should be the same size as vel
- Npix: integer
The number of pixels used in the CCF.
Returns:
========
-rv: float
The best radial velocity, in km/s
-rv_err: float
Uncertainty in the velocity, in km/s
-ccf: float
The CCF power at the maximum velocity.
"""
vel = np.atleast_1d(vel)
corr = np.atleast_1d(corr)
sorter = np.argsort(vel)
fcn = spline(vel[sorter], corr[sorter])
fcn_prime = fcn.derivative(1)
fcn_2prime = fcn.derivative(2)
guess = vel[np.argmax(corr)]
def errfcn(v):
ll = 1e6*fcn_prime(v)**2
return ll
out = minimize_scalar(errfcn, bounds=(guess-2, guess+2), method='bounded')
rv = out.x
if Npix is None:
Npix = vel.size
rv_var = -(Npix * fcn_2prime(rv) * (fcn(rv) / (1 - fcn(rv) ** 2))) ** (-1)
return rv, np.sqrt(rv_var), fcn(rv)
def reduce_resolution(wi, fi, sigma0=0.55, sigma_floor=0.2):
""" Adapt the resolution of the spectra to match the lick definitions
Lick definitions have different resolution elements as function
of wavelength. These definition are hard-coded in this function
Parameters
---------
wi: ndarray (n, )
wavelength definition
fi: ndarray (nspec, n) or (n, )
spectra to convert
sigma0: float
initial broadening in the spectra `fi`
sigma_floor: float
minimal dispersion to consider
Returns
-------
flux_red: ndarray (nspec, n) or (n, )
reduced spectra
"""
# all in AA
w_lick_res = (4000., 4400., 4900., 5400., 6000.)
lick_res = (11.5, 9.2, 8.4, 8.4, 9.8) # FWHM in AA
w = np.asarray(wi)
flux = np.atleast_2d(fi)
# Linear interpolation of lick_res over w
# numpy interp does constant instead of extrapolation
# res = np.interp(w, w_lick_res, lick_res)
# spline order: 1 linear, 2 quadratic, 3 cubic ...
from scipy.interpolate import InterpolatedUnivariateSpline
res = InterpolatedUnivariateSpline(w_lick_res, lick_res, k=1)(w)
# Compute width from fwhm
const = 2. * np.sqrt(2. * np.log(2)) # conversion fwhm --> sigma
lick_sigma = np.sqrt((res ** 2 - sigma0 ** 2)) / const
# Convolution by g=1/sqrt(2*pi*sigma^2) * exp(-r^2/(2*sigma^2))
flux_red = np.zeros(flux.shape, dtype=flux.dtype)
for i, sigma in enumerate(lick_sigma):
maxsigma = 3. * sigma
# sampling floor: min (0.2, sigma * 0.1)
delta = min(sigma_floor, sigma * 0.1)
delta_wj = np.arange(-maxsigma, + maxsigma, delta)
wj = delta_wj + w[i]
for k, fk in enumerate(flux):
fluxj = np.interp(wj, w, fk, left=0., right=0.)
flux_red[k, i] = np.sum(fluxj * delta * np.exp(-0.5 * (delta_wj / sigma) ** 2))
flux_red /= lick_sigma * const
return flux_red.reshape(np.shape(fi))