def get_local_wavenumbermesh(self, scaled=True, broadcast=False,
eliminate_highest_freq=False):
kx = fftfreq(self.N[0], 1./self.N[0])
ky = rfftfreq(self.N[1], 1./self.N[1])
if eliminate_highest_freq:
for i, k in enumerate((kx, ky)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
Ks = np.meshgrid(kx, ky[self.rank*self.Np[1]//2:(self.rank*self.Np[1]//2+self.Npf)], indexing='ij', sparse=True)
if scaled is True:
Lp = 2*np.pi/self.L
Ks[0] *= Lp[0]
Ks[1] *= Lp[1]
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
python类rfftfreq()的实例源码
def smooth_fft(dx, spec, sigma):
"""Basic math for FFT convolution with a gaussian kernel.
:param dx:
The wavelength or velocity spacing, same units as sigma
:param sigma:
The width of the gaussian kernel, same units as dx
:param spec:
The spectrum flux vector
"""
# The Fourier coordinate
ss = rfftfreq(len(spec), d=dx)
# Make the fourier space taper; just the analytical fft of a gaussian
taper = np.exp(-2 * (np.pi ** 2) * (sigma ** 2) * (ss ** 2))
ss[0] = 0.01 # hack
# Fourier transform the spectrum
spec_ff = np.fft.rfft(spec)
# Multiply in fourier space
ff_tapered = spec_ff * taper
# Fourier transform back
spec_conv = np.fft.irfft(ff_tapered)
return spec_conv
def smooth_fft_vsini(dv,spec,sigma):
# The Fourier coordinate
ss = rfftfreq(len(spec), d=dv)
# Make the fourier space taper
ss[0] = 0.01 #junk so we don't get a divide by zero error
ub = 2. * np.pi * sigma * ss
sb = j1(ub) / ub - 3 * np.cos(ub) / (2 * ub ** 2) + 3. * np.sin(ub) / (2 * ub ** 3)
#set zeroth frequency to 1 separately (DC term)
sb[0] = 1.
# Fourier transform the spectrum
FF = np.fft.rfft(spec)
# Multiply in fourier space
FF_tap = FF * sb
# Fourier transform back
spec_conv = np.fft.irfft(FF_tap)
return spec_conv
def complex_local_wavenumbers(self):
s = self.complex_local_slice()
return (fftfreq(self.N[0], 1./self.N[0]).astype(int)[s[0]],
fftfreq(self.N[1], 1./self.N[1]).astype(int),
rfftfreq(self.N[2], 1./self.N[2]).astype(int)[s[2]])
def get_local_wavenumbermesh(self, scaled=False, broadcast=False,
eliminate_highest_freq=False):
"""Returns (scaled) local decomposed wavenumbermesh
If scaled is True, then the wavenumbermesh is scaled with physical mesh
size. This takes care of mapping the physical domain to a computational
cube of size (2pi)**3
"""
s = self.complex_local_slice()
kx = fftfreq(self.N[0], 1./self.N[0]).astype(int)
ky = fftfreq(self.N[1], 1./self.N[1]).astype(int)
kz = rfftfreq(self.N[2], 1./self.N[2]).astype(int)
if eliminate_highest_freq:
for i, k in enumerate((kx, ky, kz)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
kx = kx[s[0]]
kz = kz[s[2]]
Ks = np.meshgrid(kx, ky, kz, indexing='ij', sparse=True)
if scaled is True:
Lp = 2*np.pi/self.L
for i in range(3):
Ks[i] = (Ks[i]*Lp[i]).astype(self.float)
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def fft(self):
self.spectre = fft.rfft(self.wave)
self.frequencies = fft.rfftfreq(len(self.wave), 1. / self.framerate)
# handle_line, = plt.plot(self.frequences, abs(self.spectre), label=self.start_time)
# plt.legend(handles=[handle_line])
# plt.show()
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def make_residual_func(samples, indices, **params):
'closure for residual func'
fft_size = 2
while fft_size < indices[-1]:
fft_size *= 2
freqs = rfftfreq(fft_size, 5)
ind_from = int(round(1/(params['t_max']*freqs[1])))
ind_to = ind_from+params['n_harm']
def make_series(x):
'Calculates time series from parameterized spectrum'
nonlocal freqs, ind_from, ind_to, params
spectrum = zeros_like(freqs, 'complex')
sign_x0 = 0 if x[0] == 0.5 else abs(x[0]-0.5)/(x[0]-0.5)
spectrum[0] = rect(sign_x0*exp(params['scale'][0]*abs(x[0]-0.5)), 0)
for i in range(ind_from, ind_to):
spectrum[i] = rect(
exp(params['scale'][1]*x[1]+params['slope']*log(freqs[i])),
params['scale'][2]*x[2+i-ind_from]
)
return irfft(spectrum)
def residual_func(x):
'calculates sum of squared residuals'
nonlocal samples, indices
series = make_series(x)
sum_err = 0
for position, ind in enumerate(indices):
sum_err += (series[ind]-samples[position])**2
return sum_err
return make_series, residual_func
test_helper.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4]
assert_array_almost_equal(9*fft.rfftfreq(9), x)
assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, 5]
assert_array_almost_equal(10*fft.rfftfreq(10), x)
assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x)
def interp_helper(all_data, trend_data, time_from):
'performs lf spline + hf fft interpolation of radial distance'
all_times, all_values = zip(*all_data)
trend_times, trend_values = zip(*trend_data)
split_time = int(time_to_index(time_from, all_times[0]))
trend_indices = array([time_to_index(item, all_times[0]) for item in trend_times])
spline = splrep(trend_indices, array(trend_values))
all_indices = array([time_to_index(item, all_times[0]) for item in all_times])
trend = splev(all_indices, spline)
detrended = array(all_values) - trend
trend_add = splev(arange(split_time, all_indices[-1]+1), spline)
dense_samples = detrended[:split_time]
sparse_samples = detrended[split_time:]
sparse_indices = (all_indices[split_time:]-split_time).astype(int)
amp = log(absolute(rfft(dense_samples)))
dense_freq = rfftfreq(dense_samples.size, 5)
periods = (3000.0, 300.0)
ind_from = int(round(1/(periods[0]*dense_freq[1])))
ind_to = int(round(1/(periods[1]*dense_freq[1])))
slope, _ = polyfit(log(dense_freq[ind_from:ind_to]), amp[ind_from:ind_to], 1)
params = {
't_max': periods[0],
'slope': slope,
'n_harm': 9,
'scale': [20, 4, 2*pi]
}
series_func, residual_func = make_residual_func(sparse_samples, sparse_indices, **params)
x0 = array([0.5]*(params["n_harm"]+2))
bounds = [(0, 1)]*(params["n_harm"]+2)
result = minimize(residual_func, x0, method="L-BFGS-B", bounds=bounds, options={'eps':1e-2})
interp_values = [trend + high_freq for trend, high_freq in
zip(trend_add, series_func(result.x)[:sparse_indices[-1]+1])]
#make_qc_plot(arange(sparse_indices[-1]+1), interp_values,
# sparse_indices, array(all_values[split_time:]))
interp_times = [index_to_time(ind, time_from) for ind in range(sparse_indices[-1]+1)]
return list(zip(interp_times, interp_values))