def imshow_h(self):
Gbp, Gfp = self.Gbp, self.Gfp
plt.figure(figsize=[10, 10])
plt.subplot(2, 2, 1)
plt.imshow(np.abs(Gbp), cmap='Greys_r', interpolation='none')
plt.title('Gbp - Maganitute')
plt.subplot(2, 2, 2)
plt.imshow(np.angle(Gbp), cmap='Greys_r', interpolation='none')
plt.title('Gbp - Angle')
plt.subplot(2, 2, 2 + 1)
plt.imshow(np.abs(Gfp), cmap='Greys_r', interpolation='none')
plt.title('Gbf - Maganitute')
plt.subplot(2, 2, 2 + 2)
plt.imshow(np.angle(Gfp), cmap='Greys_r', interpolation='none')
plt.title('Gbf - Angle')
python类angle()的实例源码
def run_complex(self, Original):
"""
diffract original image and recon.
ALso, the results will be shown
"""
Input = self.diffract_full(Original)
Recon = self.reconstruct(Input)
figure(figsize=(3 * 3 + 2, 3))
subplot(1, 4, 1)
imshow(Original, 'Greys_r')
title('Original')
subplot(1, 4, 2)
imshow(np.abs(Input), 'Greys_r')
title('|Diffraction|')
subplot(1, 4, 3)
imshow(np.angle(Input), 'Greys_r')
title('Angle(Diffraction)')
subplot(1, 4, 4)
imshow(np.abs(Recon), 'Greys_r')
title('Modulus: |Recon|')
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def fourierExtrapolation(x, n_predict):
n = len(x)
n_harm = 10 # number of harmonics in model
t = np.arange(0, n)
p = np.polyfit(t, x, 1) # find linear trend in x
x_notrend = x - p[0] * t # detrended x
x_freqdom = fft.fft(x_notrend) # detrended x in frequency domain
f = fft.fftfreq(n) # frequencies
indexes = list(range(n))
# sort indexes by frequency, lower -> higher
indexes.sort(key = lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_sig = np.zeros(t.size)
for i in indexes[:1 + n_harm * 2]:
ampli = np.absolute(x_freqdom[i]) / n # amplitude
phase = np.angle(x_freqdom[i]) # phase
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
return restored_sig + p[0] * t
def griffinlim(spectrogram, n_iter=50, window='hann', n_fft=2048, win_length=2048, hop_length=-1, verbose=False):
if hop_length == -1:
hop_length = n_fft // 4
angles = np.exp(2j * np.pi * np.random.rand(*spectrogram.shape))
t = tqdm(range(n_iter), ncols=100, mininterval=2.0, disable=not verbose)
for i in t:
full = np.abs(spectrogram).astype(np.complex) * angles
inverse = librosa.istft(full, hop_length = hop_length, win_length = win_length, window = window)
rebuilt = librosa.stft(inverse, n_fft = n_fft, hop_length = hop_length, win_length = win_length, window = window)
angles = np.exp(1j * np.angle(rebuilt))
if verbose:
diff = np.abs(spectrogram) - np.abs(rebuilt)
t.set_postfix(loss=np.linalg.norm(diff, 'fro'))
full = np.abs(spectrogram).astype(np.complex) * angles
inverse = librosa.istft(full, hop_length = hop_length, win_length = win_length, window = window)
return inverse
FFT_Stock_Prediction.py 文件源码
项目:Test-stock-prediction-algorithms
作者: timestocome
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def fourierEx(x, n_predict, harmonics):
n = len(x) # number of input samples
n_harmonics = harmonics # f, 2*f, 3*f, .... n_harmonics ( 1,2, )
t = np.arange(0, n) # place to store data
p = np.polyfit(t, x, 1) # find trend
x_no_trend = x - p[0] * t
x_frequency_domains = fft.fft(x_no_trend)
f = np.fft.fftfreq(n) # frequencies
indexes = list(range(n))
indexes.sort(key=lambda i: np.absolute(f[i]))
t = np.arange(0, n + n_predict)
restored_signal = np.zeros(t.size)
for i in indexes[:1 + n_harmonics * 2]:
amplitude = np.absolute(x_frequency_domains[i] / n)
phase = np.angle(x_frequency_domains[i])
restored_signal += amplitude * np.cos(2 * np.pi * f[i] * t + phase)
return restored_signal + p[0] * t
def processData(self, data):
times = data.xvals('Time')
dt = times[1]-times[0]
data1 = data.asarray()
ft = np.fft.fft(data1)
## determine frequencies in fft data
df = 1.0 / (len(data1) * dt)
freqs = np.linspace(0.0, (len(ft)-1) * df, len(ft))
## flatten spikes at f0 and harmonics
f0 = self.ctrls['f0'].value()
for i in xrange(1, self.ctrls['harmonics'].value()+2):
f = f0 * i # target frequency
## determine index range to check for this frequency
ind1 = int(np.floor(f / df))
ind2 = int(np.ceil(f / df)) + (self.ctrls['samples'].value()-1)
if ind1 > len(ft)/2.:
break
mag = (abs(ft[ind1-1]) + abs(ft[ind2+1])) * 0.5
for j in range(ind1, ind2+1):
phase = np.angle(ft[j]) ## Must preserve the phase of each point, otherwise any transients in the trace might lead to large artifacts.
re = mag * np.cos(phase)
im = mag * np.sin(phase)
ft[j] = re + im*1j
ft[len(ft)-j] = re - im*1j
data2 = np.fft.ifft(ft).real
ma = metaarray.MetaArray(data2, info=data.infoCopy())
return ma
def removePeriodic(data, f0=60.0, dt=None, harmonics=10, samples=4):
if (hasattr(data, 'implements') and data.implements('MetaArray')):
data1 = data.asarray()
if dt is None:
times = data.xvals('Time')
dt = times[1]-times[0]
else:
data1 = data
if dt is None:
raise Exception('Must specify dt for this data')
ft = np.fft.fft(data1)
## determine frequencies in fft data
df = 1.0 / (len(data1) * dt)
freqs = np.linspace(0.0, (len(ft)-1) * df, len(ft))
## flatten spikes at f0 and harmonics
for i in xrange(1, harmonics + 2):
f = f0 * i # target frequency
## determine index range to check for this frequency
ind1 = int(np.floor(f / df))
ind2 = int(np.ceil(f / df)) + (samples-1)
if ind1 > len(ft)/2.:
break
mag = (abs(ft[ind1-1]) + abs(ft[ind2+1])) * 0.5
for j in range(ind1, ind2+1):
phase = np.angle(ft[j]) ## Must preserve the phase of each point, otherwise any transients in the trace might lead to large artifacts.
re = mag * np.cos(phase)
im = mag * np.sin(phase)
ft[j] = re + im*1j
ft[len(ft)-j] = re - im*1j
data2 = np.fft.ifft(ft).real
if (hasattr(data, 'implements') and data.implements('MetaArray')):
return metaarray.MetaArray(data2, info=data.infoCopy())
else:
return data2
def processData(self, data):
times = data.xvals('Time')
dt = times[1]-times[0]
data1 = data.asarray()
ft = np.fft.fft(data1)
## determine frequencies in fft data
df = 1.0 / (len(data1) * dt)
freqs = np.linspace(0.0, (len(ft)-1) * df, len(ft))
## flatten spikes at f0 and harmonics
f0 = self.ctrls['f0'].value()
for i in xrange(1, self.ctrls['harmonics'].value()+2):
f = f0 * i # target frequency
## determine index range to check for this frequency
ind1 = int(np.floor(f / df))
ind2 = int(np.ceil(f / df)) + (self.ctrls['samples'].value()-1)
if ind1 > len(ft)/2.:
break
mag = (abs(ft[ind1-1]) + abs(ft[ind2+1])) * 0.5
for j in range(ind1, ind2+1):
phase = np.angle(ft[j]) ## Must preserve the phase of each point, otherwise any transients in the trace might lead to large artifacts.
re = mag * np.cos(phase)
im = mag * np.sin(phase)
ft[j] = re + im*1j
ft[len(ft)-j] = re - im*1j
data2 = np.fft.ifft(ft).real
ma = metaarray.MetaArray(data2, info=data.infoCopy())
return ma
def removePeriodic(data, f0=60.0, dt=None, harmonics=10, samples=4):
if (hasattr(data, 'implements') and data.implements('MetaArray')):
data1 = data.asarray()
if dt is None:
times = data.xvals('Time')
dt = times[1]-times[0]
else:
data1 = data
if dt is None:
raise Exception('Must specify dt for this data')
ft = np.fft.fft(data1)
## determine frequencies in fft data
df = 1.0 / (len(data1) * dt)
freqs = np.linspace(0.0, (len(ft)-1) * df, len(ft))
## flatten spikes at f0 and harmonics
for i in xrange(1, harmonics + 2):
f = f0 * i # target frequency
## determine index range to check for this frequency
ind1 = int(np.floor(f / df))
ind2 = int(np.ceil(f / df)) + (samples-1)
if ind1 > len(ft)/2.:
break
mag = (abs(ft[ind1-1]) + abs(ft[ind2+1])) * 0.5
for j in range(ind1, ind2+1):
phase = np.angle(ft[j]) ## Must preserve the phase of each point, otherwise any transients in the trace might lead to large artifacts.
re = mag * np.cos(phase)
im = mag * np.sin(phase)
ft[j] = re + im*1j
ft[len(ft)-j] = re - im*1j
data2 = np.fft.ifft(ft).real
if (hasattr(data, 'implements') and data.implements('MetaArray')):
return metaarray.MetaArray(data2, info=data.infoCopy())
else:
return data2
def getMag(_fft):
_mag = abs(_fft);
norm_mag = _mag/_mag.max();
#angle = angle/angle.max();
return norm_mag;
def resample_2d_complex(array, sample_pts, query_pts):
''' Resamples a 2D complex array by interpolating the magnitude and phase
independently and merging the results into a complex value.
Args:
array (numpy.ndarray): complex 2D array.
sample_pts (tuple): pair of numpy.ndarray objects that contain the x and y sample locations,
each array should be 1D.
query_pts (tuple): points to interpolate onto, also 1D for each array.
Returns:
numpy.ndarray array resampled onto query_pts via bivariate spline
'''
xq, yq = np.meshgrid(*query_pts)
mag = abs(array)
phase = np.angle(array)
magfunc = interpolate.RegularGridInterpolator(sample_pts, mag)
phasefunc = interpolate.RegularGridInterpolator(sample_pts, phase)
interp_mag = magfunc((yq, xq))
interp_phase = phasefunc((yq, xq))
return interp_mag * exp(1j * interp_phase)
def phase(self):
"""
Return the phase spectrum.
"""
return np.angle(self)
def phase(z):
val = np.angle(z)
# val = np.rad2deg(np.unwrap(np.angle((z))))
return val
def DFT(x, w, N):
""" Discrete Fourier Transformation(Analysis) of a given real input signal
via an FFT implementation from scipy. Single channel is being supported.
Args:
x : (array) Real time domain input signal
w : (array) Desired windowing function
N : (int) FFT size
Returns:
magX : (2D ndarray) Magnitude Spectrum
phsX : (2D ndarray) Phase Spectrum
"""
# Half spectrum size containing DC component
hlfN = (N/2)+1
# Half window size. Two parameters to perform zero-phase windowing technique
hw1 = int(math.floor((w.size+1)/2))
hw2 = int(math.floor(w.size/2))
# Window the input signal
winx = x*w
# Initialize FFT buffer with zeros and perform zero-phase windowing
fftbuffer = np.zeros(N)
fftbuffer[:hw1] = winx[hw2:]
fftbuffer[-hw2:] = winx[:hw2]
# Compute DFT via scipy's FFT implementation
X = fft(fftbuffer)
# Acquire magnitude and phase spectrum
magX = (np.abs(X[:hlfN]))
phsX = (np.angle(X[:hlfN]))
return magX, phsX
def test_simple(self):
x = np.array([1+0j, 1+2j])
y_r = np.log(np.abs(x)) + 1j * np.angle(x)
y = np.log(x)
for i in range(len(x)):
assert_almost_equal(y[i], y_r[i])
def test_basic_ufuncs(self):
# Test various functions such as sin, cos.
(x, y, a10, m1, m2, xm, ym, z, zm, xf) = self.d
assert_equal(np.cos(x), cos(xm))
assert_equal(np.cosh(x), cosh(xm))
assert_equal(np.sin(x), sin(xm))
assert_equal(np.sinh(x), sinh(xm))
assert_equal(np.tan(x), tan(xm))
assert_equal(np.tanh(x), tanh(xm))
assert_equal(np.sqrt(abs(x)), sqrt(xm))
assert_equal(np.log(abs(x)), log(xm))
assert_equal(np.log10(abs(x)), log10(xm))
assert_equal(np.exp(x), exp(xm))
assert_equal(np.arcsin(z), arcsin(zm))
assert_equal(np.arccos(z), arccos(zm))
assert_equal(np.arctan(z), arctan(zm))
assert_equal(np.arctan2(x, y), arctan2(xm, ym))
assert_equal(np.absolute(x), absolute(xm))
assert_equal(np.angle(x + 1j*y), angle(xm + 1j*ym))
assert_equal(np.angle(x + 1j*y, deg=True), angle(xm + 1j*ym, deg=True))
assert_equal(np.equal(x, y), equal(xm, ym))
assert_equal(np.not_equal(x, y), not_equal(xm, ym))
assert_equal(np.less(x, y), less(xm, ym))
assert_equal(np.greater(x, y), greater(xm, ym))
assert_equal(np.less_equal(x, y), less_equal(xm, ym))
assert_equal(np.greater_equal(x, y), greater_equal(xm, ym))
assert_equal(np.conjugate(x), conjugate(xm))
def synthesis_speech(noisy_speech, ideal_mask, win_type, win_len, shift_len, syn_method='A&R'):
samples = noisy_speech.shape[0]
frames = (samples - win_len) // shift_len
if win_type == 'hanning':
window = np.hanning(win_len)
elif win_type == 'hamming':
window = np.hamming(win_len)
elif win_type == 'rectangle':
window = np.ones(win_len)
to_ifft = np.zeros(win_len, dtype=np.complex64)
clean_speech = np.zeros((frames-1)*shift_len+win_len, dtype=np.float32)
window_sum = np.zeros((frames-1)*shift_len+win_len, dtype=np.float32)
for i in range(frames):
one_frame = noisy_speech[i * shift_len: i * shift_len + win_len]
windowed_frame = np.multiply(one_frame, window)
stft = np.fft.fft(windowed_frame, win_len)
masked_abs = np.abs(stft[:win_len//2+1]) * ideal_mask[:, i]
to_ifft[:win_len//2+1] = masked_abs * np.exp(1j * np.angle(stft[:win_len//2+1]))
to_ifft[win_len//2+1:] = np.conj(to_ifft[win_len//2-1:0:-1])
speech_seg = np.real(np.fft.ifft(to_ifft, win_len))
if syn_method == 'A&R' or syn_method == 'ALLEN & RABINER':
clean_speech[i*shift_len:i*shift_len+win_len] += speech_seg
window_sum[i*shift_len:i*shift_len+win_len] += window
elif syn_method == 'G&L' or syn_method == 'GRIFFIN & LIM':
speech_seg = np.multiply(speech_seg, window)
clean_speech[i * shift_len:i * shift_len + win_len] += speech_seg
window_sum[i * shift_len:i * shift_len + win_len] += np.power(window, 2.)
# if i > 0:
# clean_speech[i*shift_len: (i-1)*shift_len+win_len] *= 0.5
window_sum = np.where(window_sum < 1e-2, 1e-2, window_sum)
return clean_speech / window_sum
def synthesis_speech(ns, mk, win_type, win_len, shift_len, syn_method='A&R'):
samples = ns.shape[0]
frames = (samples - win_len) // shift_len
if win_type == 'hanning':
window = np.hanning(win_len)
elif win_type == 'hamming':
window = np.hamming(win_len)
elif win_type == 'rectangle':
window = np.ones(win_len)
to_ifft = np.zeros(win_len, dtype=np.complex64)
clean_speech = np.zeros((frames-1)*shift_len+win_len, dtype=np.float32)
window_sum = np.zeros((frames-1)*shift_len+win_len, dtype=np.float32)
for i in range(frames):
one_frame = ns[i * shift_len: i * shift_len + win_len]
windowed_frame = np.multiply(one_frame, window)
stft = np.fft.fft(windowed_frame, win_len)
masked_abs = np.abs(stft[:win_len//2+1]) * mk[:, i]
to_ifft[:win_len//2+1] = masked_abs * np.exp(1j * np.angle(stft[:win_len//2+1]))
to_ifft[win_len//2+1:] = np.conj(to_ifft[win_len//2-1:0:-1])
speech_seg = np.real(np.fft.ifft(to_ifft, 320))
if syn_method == 'A&R' or syn_method == 'ALLEN & RABINER':
clean_speech[i*shift_len:i*shift_len+win_len] += speech_seg
window_sum[i*shift_len:i*shift_len+win_len] += window
elif syn_method == 'G&L' or syn_method == 'GRIFFIN & LIM':
speech_seg = np.multiply(speech_seg, window)
clean_speech[i * shift_len:i * shift_len + win_len] += speech_seg
window_sum[i * shift_len:i * shift_len + win_len] += np.power(window, 2.)
# if i > 0:
# clean_speech[i*shift_len: (i-1)*shift_len+win_len] *= 0.5
window_sum = np.where(window_sum < 1e-2, 1e-2, window_sum)
return clean_speech / window_sum