def __init__(self, N, L, comm, precision,
communication="Alltoall",
padsize=1.5,
threads=1,
planner_effort=defaultdict(lambda: "FFTW_MEASURE")):
R2C.__init__(self, N, L, comm, precision,
communication=communication,
padsize=padsize, threads=threads,
planner_effort=planner_effort)
# Reuse all shapes from r2c transform R2C simply by resizing the final complex z-dimension:
self.Nf = N[2]
self.Nfp = int(self.padsize*self.N[2]) # Independent complex wavenumbers in z-direction for padded array
# Rename since there's no real space
self.original_shape_padded = self.real_shape_padded
self.original_shape = self.real_shape
self.transformed_shape = self.complex_shape
self.original_local_slice = self.real_local_slice
self.transformed_local_slice = self.complex_local_slice
self.ks = (fftfreq(N[2])*N[2]).astype(int)
python类fftfreq()的实例源码
def __init__(self, N, L, comm, precision, padsize=1.5, threads=1,
planner_effort=defaultdict(lambda : "FFTW_MEASURE")):
self.N = N # The global size of the problem
self.L = L
assert len(L) == 2
assert len(N) == 2
self.comm = comm
self.float, self.complex, self.mpitype = float, complex, mpitype = datatypes(precision)
self.num_processes = comm.Get_size()
self.rank = comm.Get_rank()
self.padsize = padsize
self.threads = threads
self.planner_effort = planner_effort
# Each cpu gets ownership of Np indices
self.Np = N // self.num_processes
self.Nf = N[1]//2+1
self.Npf = self.Np[1]//2+1 if self.rank+1 == self.num_processes else self.Np[1]//2
self.Nfp = int(padsize*self.N[1]/2+1)
self.ks = (fftfreq(N[0])*N[0]).astype(int)
self.dealias = zeros(0)
self.work_arrays = work_arrays()
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def peak_freq(signal,timebase,minfreq=0,maxfreq=1.e18):
"""
TODO: old code: needs review
this function only has a basic unittest to make sure it returns
the correct freq in a simple case.
"""
timebase = array(timebase)
sig_fft = fft.fft(signal)
sample_time = float(mean(timebase[1:]-timebase[:-1]))
#SRH modification, frequencies seemed a little bit off because of the -1 in the denominator
#Here we are trusting numpy....
#fft_freqs = (1./sample_time)*arange(len(sig_fft)).astype(float)/(len(sig_fft)-1)
fft_freqs = fft.fftfreq(len(sig_fft),d=sample_time)
# only show up to nyquist freq
new_len = len(sig_fft)/2
sig_fft = sig_fft[:new_len]
fft_freqs = fft_freqs[:new_len]
[minfreq_elmt,maxfreq_elmt] = searchsorted(fft_freqs,[minfreq,maxfreq])
sig_fft = sig_fft[minfreq_elmt:maxfreq_elmt]
fft_freqs = fft_freqs[minfreq_elmt:maxfreq_elmt]
peak_elmt = (argsort(abs(sig_fft)))[-1]
return [fft_freqs[peak_elmt], peak_elmt]
def complex_local_wavenumbers(self):
"""Returns local wavenumbers of complex space"""
return (fftfreq(self.N[0], 1./self.N[0]),
fftfreq(self.N[1], 1./self.N[1])[self.complex_local_slice()[1]],
rfftfreq(self.N[2], 1./self.N[2]))
def get_local_wavenumbermesh(self, scaled=False, broadcast=False, eliminate_highest_freq=False):
"""Returns (scaled) local decomposed wavenumbermesh
If scaled is True, then the wavenumbermesh is scaled with physical mesh
size. This takes care of mapping the physical domain to a computational
cube of size (2pi)**3.
If eliminate_highest_freq is True, then the Nyquist frequency is set to zero.
"""
kx, ky, kz = self.complex_local_wavenumbers()
if eliminate_highest_freq:
ky = fftfreq(self.N[1], 1./self.N[1])
for i, k in enumerate((kx, ky, kz)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
ky = ky[self.complex_local_slice()[1]]
Ks = np.meshgrid(kx, ky, kz, indexing='ij', sparse=True)
if scaled:
Lp = 2*np.pi/self.L
for i in range(3):
Ks[i] *= Lp[i]
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
def transformed_local_wavenumbers(self):
return (fftfreq(self.N[0], 1./self.N[0]),
fftfreq(self.N[1], 1./self.N[1])[self.transformed_local_slice()[1]],
fftfreq(self.N[2], 1./self.N[2]))
def complex_local_wavenumbers(self):
s = self.complex_local_slice()
return (fftfreq(self.N[0], 1./self.N[0]).astype(int)[s[0]],
fftfreq(self.N[1], 1./self.N[1]).astype(int),
rfftfreq(self.N[2], 1./self.N[2]).astype(int)[s[2]])
def get_local_wavenumbermesh(self, scaled=False, broadcast=False,
eliminate_highest_freq=False):
"""Returns (scaled) local decomposed wavenumbermesh
If scaled is True, then the wavenumbermesh is scaled with physical mesh
size. This takes care of mapping the physical domain to a computational
cube of size (2pi)**3
"""
s = self.complex_local_slice()
kx = fftfreq(self.N[0], 1./self.N[0]).astype(int)
ky = fftfreq(self.N[1], 1./self.N[1]).astype(int)
kz = rfftfreq(self.N[2], 1./self.N[2]).astype(int)
if eliminate_highest_freq:
for i, k in enumerate((kx, ky, kz)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
kx = kx[s[0]]
kz = kz[s[2]]
Ks = np.meshgrid(kx, ky, kz, indexing='ij', sparse=True)
if scaled is True:
Lp = 2*np.pi/self.L
for i in range(3):
Ks[i] = (Ks[i]*Lp[i]).astype(self.float)
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
def get_local_wavenumbermesh(self):
xyrank = self.comm0.Get_rank() # Local rank in xz-plane
yzrank = self.comm1.Get_rank() # Local rank in yz-plane
# Set wavenumbers in grid
kx = fftfreq(self.N[0], 1./self.N[0]).astype(int)
ky = fftfreq(self.N[1], 1./self.N[1]).astype(int)
kz = fftfreq(self.N[2], 1./self.N[2]).astype(int)
k2 = slice(xyrank*self.N1[1], (xyrank+1)*self.N1[1], 1)
k1 = slice(yzrank*self.N2[2]//2, (yzrank+1)*self.N2[2]//2, 1)
K = np.array(np.meshgrid(kx, ky[k2], kz[k1], indexing='ij'), dtype=self.float)
return K
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)
test_helper.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)
def find_frequency(self, v, si): # voltages, samplimg interval is seconds
from numpy import fft
NP = len(v)
v = v -v.mean() # remove DC component
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
return frq[index]
def amp_spectrum(self, v, si, nhar=8):
# voltages, samplimg interval is seconds, number of harmonics to retain
from numpy import fft
NP = len(v)
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
if index == 0: # DC component is dominating
index = amp[4:].argmax() # skip frequencies close to zero
return frq[:index*nhar], amp[:index*nhar] # restrict to 'nhar' harmonics
def fft(self,ya, si):
'''
Returns positive half of the Fourier transform of the signal ya.
Sampling interval 'si', in milliseconds
'''
ns = len(ya)
if ns %2 == 1: # odd values of np give exceptions
ns=ns-1 # make it even
ya=ya[:-1]
v = np.array(ya)
tr = abs(np.fft.fft(v))/ns
frq = np.fft.fftfreq(ns, si)
x = frq.reshape(2,ns/2)
y = tr.reshape(2,ns/2)
return x[0], y[0]
def find_frequency(self, v, si): # voltages, samplimg interval is seconds
from numpy import fft
NP = len(v)
v = v -v.mean() # remove DC component
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
return frq[index]
def amp_spectrum(self, v, si, nhar=8):
# voltages, samplimg interval is seconds, number of harmonics to retain
from numpy import fft
NP = len(v)
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
if index == 0: # DC component is dominating
index = amp[4:].argmax() # skip frequencies close to zero
return frq[:index*nhar], amp[:index*nhar] # restrict to 'nhar' harmonics
def fft(self,ya, si):
'''
Returns positive half of the Fourier transform of the signal ya.
Sampling interval 'si', in milliseconds
'''
ns = len(ya)
if ns %2 == 1: # odd values of np give exceptions
ns=ns-1 # make it even
ya=ya[:-1]
v = np.array(ya)
tr = abs(np.fft.fft(v))/ns
frq = np.fft.fftfreq(ns, si)
x = frq.reshape(2,ns/2)
y = tr.reshape(2,ns/2)
return x[0], y[0]
def test_definition(self):
x = [0, 1, 2, 3, 4, -4, -3, -2, -1]
assert_array_almost_equal(9*fft.fftfreq(9), x)
assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x)
x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1]
assert_array_almost_equal(10*fft.fftfreq(10), x)
assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x)