def fit_rabi(xdata, ydata):
"""Analyze Rabi amplitude data to find pi-pulse amplitude and phase offset.
Arguments:
xdata: ndarray of calibration amplitudes. length should be even.
ydata: measurement amplitudes
Returns:
pi_amp: Fitted amplitude of pi pulsed
offset: Fitted mixer offset
fit_pts: Fitted points."""
#seed Rabi frequency from largest FFT component
N = len(ydata)
yfft = fft(ydata)
f_max_ind = np.argmax(np.abs(yfft[1:N//2]))
f_0 = 0.5 * max([1, f_max_ind]) / xdata[-1]
amp_0 = 0.5*(ydata.max() - ydata.min())
offset_0 = np.mean(ydata)
phase_0 = 0
if ydata[N//2 - 1] > offset_0:
amp_0 = -amp_0
popt, _ = curve_fit(rabi_model, xdata, ydata, [offset_0, amp_0, f_0, phase_0])
f_rabi = np.abs(popt[2])
pi_amp = 0.5/f_rabi
offset = popt[3]
return pi_amp, offset, popt
python类fft()的实例源码
def smooth_fft(dx, spec, sigma):
"""Basic math for FFT convolution with a gaussian kernel.
:param dx:
The wavelength or velocity spacing, same units as sigma
:param sigma:
The width of the gaussian kernel, same units as dx
:param spec:
The spectrum flux vector
"""
# The Fourier coordinate
ss = rfftfreq(len(spec), d=dx)
# Make the fourier space taper; just the analytical fft of a gaussian
taper = np.exp(-2 * (np.pi ** 2) * (sigma ** 2) * (ss ** 2))
ss[0] = 0.01 # hack
# Fourier transform the spectrum
spec_ff = np.fft.rfft(spec)
# Multiply in fourier space
ff_tapered = spec_ff * taper
# Fourier transform back
spec_conv = np.fft.irfft(ff_tapered)
return spec_conv
def _ncc_c(x, y):
"""
>>> _ncc_c([1,2,3,4], [1,2,3,4])
array([ 0.13333333, 0.36666667, 0.66666667, 1. , 0.66666667,
0.36666667, 0.13333333])
>>> _ncc_c([1,1,1], [1,1,1])
array([ 0.33333333, 0.66666667, 1. , 0.66666667, 0.33333333])
>>> _ncc_c([1,2,3], [-1,-1,-1])
array([-0.15430335, -0.46291005, -0.9258201 , -0.77151675, -0.46291005])
"""
den = np.array(norm(x) * norm(y))
den[den == 0] = np.Inf
x_len = len(x)
fft_size = 1<<(2*x_len-1).bit_length()
cc = ifft(fft(x, fft_size) * np.conj(fft(y, fft_size)))
cc = np.concatenate((cc[-(x_len-1):], cc[:x_len]))
return np.real(cc) / den
def cconv(a, b):
"""
Circular convolution of vectors
Computes the circular convolution of two vectors a and b via their
fast fourier transforms
a \ast b = \mathcal{F}^{-1}(\mathcal{F}(a) \odot \mathcal{F}(b))
Parameter
---------
a: real valued array (shape N)
b: real valued array (shape N)
Returns
-------
c: real valued array (shape N), representing the circular
convolution of a and b
"""
return ifft(fft(a) * fft(b)).real
def ccorr(a, b):
"""
Circular correlation of vectors
Computes the circular correlation of two vectors a and b via their
fast fourier transforms
a \ast b = \mathcal{F}^{-1}(\overline{\mathcal{F}(a)} \odot \mathcal{F}(b))
Parameter
---------
a: real valued array (shape N)
b: real valued array (shape N)
Returns
-------
c: real valued array (shape N), representing the circular
correlation of a and b
"""
return ifft(np.conj(fft(a)) * fft(b)).real
def kayufft_db(signal, rate):
""" Perform FFT for wood's longitudinal stress wave signal
Inputs:
Acoustic signal in time domain and the sampling rate
Returns:
Signal in ferquency domain with unit in dB
"""
signal_length = len(signal)
fft_result = fft(signal)
uniq_points = int(ceil((signal_length + 1) / 2.0)) #only takes one side of FFT result
fft_result = fft_result[0:uniq_points]
fft_result = abs(fft_result)
fft_result = fft_result / float(signal_length)
fft_result = fft_result**2
if signal_length % 2 > 0:
fft_result[1:len(fft_result)] = fft_result[1:len(fft_result)] * 2
else:
fft_result[1:len(fft_result) - 1] = fft_result[1:len(fft_result) - 1] * 2
fft_abscissa = arange(0, uniq_points, 1.0) * (rate / signal_length)
fft_ordinate = 10 * log10(fft_result)
return fft_ordinate, fft_abscissa
def smooth_fft_vsini(dv,spec,sigma):
# The Fourier coordinate
ss = rfftfreq(len(spec), d=dv)
# Make the fourier space taper
ss[0] = 0.01 #junk so we don't get a divide by zero error
ub = 2. * np.pi * sigma * ss
sb = j1(ub) / ub - 3 * np.cos(ub) / (2 * ub ** 2) + 3. * np.sin(ub) / (2 * ub ** 3)
#set zeroth frequency to 1 separately (DC term)
sb[0] = 1.
# Fourier transform the spectrum
FF = np.fft.rfft(spec)
# Multiply in fourier space
FF_tap = FF * sb
# Fourier transform back
spec_conv = np.fft.irfft(FF_tap)
return spec_conv
def test_random_x(self):
'''
Automatically generate a number of test cases for DFT() and compare results to numpy.fft.ftt
This will generate and test input lists of length 2 to max_N a total of 10 times each
'''
max_N=20
#for x of length 2 to max_N (inclusive)
for N in range(2,max_N+1):
#generate and test x ten times
for t in range(0,10):
#randomly generate x
x = []
for i in range(0,N):
x.append((random()-0.5)*2+(random()-0.5)*2j)
#test DFT by comparing to fft.fft out to 6 decimal places
testing.assert_array_almost_equal(DFT(x),fft.fft(x), err_msg='Your results do not agree with numpy.fft.fft',verbose=True)
#plot_rand(x)
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
FFT_Stock_Prediction.py 文件源码
项目:Test-stock-prediction-algorithms
作者: timestocome
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def fourierEx(x, n_predict, harmonics):
n = len(x) # number of input samples
n_harmonics = harmonics # f, 2*f, 3*f, .... n_harmonics ( 1,2, )
t = np.arange(0, n) # place to store data
p = np.polyfit(t, x, 1) # find trend
x_no_trend = x - p[0] * t
x_frequency_domains = fft.fft(x_no_trend)
f = np.fft.fftfreq(n) # frequencies
indexes = list(range(n))
indexes.sort(key=lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_signal = np.zeros(t.size)
for i in indexes[:1 + n_harmonics * 2]:
amplitude = np.absolute(x_frequency_domains[i] / n)
phase = np.angle(x_frequency_domains[i])
restored_signal += amplitude * np.cos(2 * np.pi * f[i] * t + phase)
return restored_signal + p[0] * t
def peak_freq(signal,timebase,minfreq=0,maxfreq=1.e18):
"""
TODO: old code: needs review
this function only has a basic unittest to make sure it returns
the correct freq in a simple case.
"""
timebase = array(timebase)
sig_fft = fft.fft(signal)
sample_time = float(mean(timebase[1:]-timebase[:-1]))
#SRH modification, frequencies seemed a little bit off because of the -1 in the denominator
#Here we are trusting numpy....
#fft_freqs = (1./sample_time)*arange(len(sig_fft)).astype(float)/(len(sig_fft)-1)
fft_freqs = fft.fftfreq(len(sig_fft),d=sample_time)
# only show up to nyquist freq
new_len = len(sig_fft)/2
sig_fft = sig_fft[:new_len]
fft_freqs = fft_freqs[:new_len]
[minfreq_elmt,maxfreq_elmt] = searchsorted(fft_freqs,[minfreq,maxfreq])
sig_fft = sig_fft[minfreq_elmt:maxfreq_elmt]
fft_freqs = fft_freqs[minfreq_elmt:maxfreq_elmt]
peak_elmt = (argsort(abs(sig_fft)))[-1]
return [fft_freqs[peak_elmt], peak_elmt]
def kernel_fredriksen(n):
"""
Generates kernel for Hilbert transform using FFT.
Parameters
----------
n : int
Number of equidistant grid points.
Returns
-------
array
Kernel used when performing Hilbert transform using FFT.
"""
aux = np.zeros(n+1, dtype=doublenp)
for i in range(1,n+1):
aux[i] = i*log(i)
m = 2*n
ker = np.zeros(m, dtype=doublenp)
for i in range(1,n):
ker[i] = aux[i+1]-2*aux[i]+aux[i-1]
ker[m-i] = -ker[i]
return fft(ker)/pi
def hilbert_fredriksen(f, ker=None):
"""
Performs Hilbert transform of f.
Parameters
----------
f : array
Values of function on a equidistant grid.
ker : array
Kernel used when performing Hilbert transform using FFT.
Returns
-------
array
Hilbert transform of f.
"""
if ker is None:
ker = kernel_fredriksen(len(f))
n = len(f)
fpad = fft(np.concatenate( (f,np.zeros(len(ker)-n)) ))
r = ifft(fpad*ker)
return r[0:n]
def cross_correlation_using_fft(self, x, y):
f1 = fft(x)
f2 = fft(np.flipud(y))
cc = np.real(ifft(f1 * f2))
return fftshift(cc)
# clean up
# def close(self):
# close serial
# self.ser.flush()
# self.ser.close()
# Main
def hilbert(signal):
# construct the Hilbert transform of the signal via the FFT
# in essense, we just want to set negative frequency components to zero
spectrum = np.fft.fft(signal)
n = len(signal)
midpoint = int(np.ceil(n/2))
kernel = np.zeros(n)
kernel[0] = 1
if n%2 == 0:
kernel[midpoint] = 1
kernel[1:midpoint] = 2
return np.fft.ifft(kernel * spectrum)
def init_pyfftw(x, effort=effort[0], wis=False):
N = len(x)
a = pyfftw.n_byte_align_empty(int(N), n, 'complex128')
a[:] = x
if wis is not False:
pyfftw.import_wisdom(wis)
fft = pyfftw.builders.fft(a, threads=8)
ifft = pyfftw.builders.ifft(a, threads=8)
else:
fft = pyfftw.builders.fft(a, planner_effort=effort, threads=8)
ifft = pyfftw.builders.ifft(a, planner_effort=effort, threads=8)
return fft, ifft
def delayseq(x, delay_sec:float, fs:int):
"""
x: input 1-D signal
delay_sec: amount to shift signal [seconds]
fs: sampling frequency [Hz]
xs: time-shifted signal
"""
assert x.ndim == 1, 'only 1-D signals for now'
delay_samples = delay_sec*fs
delay_int = round(delay_samples)
nfft = nextpow2(x.size+delay_int)
fbins = 2*pi*ifftshift((arange(nfft)-nfft//2))/nfft
X = fft(x,nfft)
Xs = ifft(X*exp(-1j*delay_samples*fbins))
if isreal(x[0]):
Xs = Xs.real
xs = zeros_like(x)
xs[delay_int:] = Xs[delay_int:x.size]
return xs
def argmax_p(fft_ordinate, rate):
""" Find an argmax of fft result
"""
upper_thresh_freq = 4100
lower_thresh_freq = 800
upper_thresh = int(len(fft_ordinate) / (rate / 2) * upper_thresh_freq)
lower_thresh = int(len(fft_ordinate) / (rate / 2) * lower_thresh_freq)
argmax = fft_ordinate[lower_thresh:upper_thresh].argmax()
argmax += lower_thresh
return argmax
def max_p(fft_ordinate, rate):
""" Find an argmax of fft result
"""
upper_thresh_freq = 4100
lower_thresh_freq = 800
upper_thresh = int(len(fft_ordinate) / (rate / 2) * upper_thresh_freq)
lower_thresh = int(len(fft_ordinate) / (rate / 2) * lower_thresh_freq)
# return max(fft_ordinate)
return max(fft_ordinate[lower_thresh:upper_thresh])
def Fourier(x,fs,NFFT): # Approximate Fourier transform on the interval (-fs/2,fs/2)
k=np.arange(NFFT)
N=len(x)
T=N/fs
f=(-.5+k/float(NFFT))*fs
n=np.arange(N)
return f, T/N*np.exp(1j*pi*f*T)*fft(x*(-1.)**n,NFFT)
def find_frequency(self, v, si): # voltages, samplimg interval is seconds
from numpy import fft
NP = len(v)
v = v -v.mean() # remove DC component
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
return frq[index]
def amp_spectrum(self, v, si, nhar=8):
# voltages, samplimg interval is seconds, number of harmonics to retain
from numpy import fft
NP = len(v)
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
if index == 0: # DC component is dominating
index = amp[4:].argmax() # skip frequencies close to zero
return frq[:index*nhar], amp[:index*nhar] # restrict to 'nhar' harmonics
def fft(self,ya, si):
'''
Returns positive half of the Fourier transform of the signal ya.
Sampling interval 'si', in milliseconds
'''
ns = len(ya)
if ns %2 == 1: # odd values of np give exceptions
ns=ns-1 # make it even
ya=ya[:-1]
v = np.array(ya)
tr = abs(np.fft.fft(v))/ns
frq = np.fft.fftfreq(ns, si)
x = frq.reshape(2,ns/2)
y = tr.reshape(2,ns/2)
return x[0], y[0]
def cps(a,b):
return fft.fft(a)*conjugate(fft.fft(b))
def cps(a,b):
return fft.fft(a)*conjugate(fft.fft(b))
def find_frequency(self, v, si): # voltages, samplimg interval is seconds
from numpy import fft
NP = len(v)
v = v -v.mean() # remove DC component
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
return frq[index]
def amp_spectrum(self, v, si, nhar=8):
# voltages, samplimg interval is seconds, number of harmonics to retain
from numpy import fft
NP = len(v)
frq = fft.fftfreq(NP, si)[:NP/2] # take only the +ive half of the frequncy array
amp = abs(fft.fft(v)[:NP/2])/NP # and the fft result
index = amp.argmax() # search for the tallest peak, the fundamental
if index == 0: # DC component is dominating
index = amp[4:].argmax() # skip frequencies close to zero
return frq[:index*nhar], amp[:index*nhar] # restrict to 'nhar' harmonics
def fft(self,ya, si):
'''
Returns positive half of the Fourier transform of the signal ya.
Sampling interval 'si', in milliseconds
'''
ns = len(ya)
if ns %2 == 1: # odd values of np give exceptions
ns=ns-1 # make it even
ya=ya[:-1]
v = np.array(ya)
tr = abs(np.fft.fft(v))/ns
frq = np.fft.fftfreq(ns, si)
x = frq.reshape(2,ns/2)
y = tr.reshape(2,ns/2)
return x[0], y[0]