def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
python类fft()的实例源码
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def spectraltest(binin):
'''The focus of this test is the peak heights in the discrete Fast Fourier Transform. The purpose of this test is to detect periodic features (i.e., repetitive patterns that are near each other) in the tested sequence that would indicate a deviation from the assumption of randomness. '''
n = len(binin)
ss = [int(el) for el in binin]
sc = map(sumi, ss)
ft = sff.fft(sc)
af = abs(ft)[1:n/2+1:]
t = np.sqrt(np.log(1/0.05)*n)
n0 = 0.95*n/2
n1 = len(np.where(af<t)[0])
d = (n1 - n0)/np.sqrt(n*0.95*0.05/4)
pval = spc.erfc(abs(d)/np.sqrt(2))
return pval > 0.01
def DFT(x, w, N):
""" Discrete Fourier Transformation(Analysis) of a given real input signal
via an FFT implementation from scipy. Single channel is being supported.
Args:
x : (array) Real time domain input signal
w : (array) Desired windowing function
N : (int) FFT size
Returns:
magX : (2D ndarray) Magnitude Spectrum
phsX : (2D ndarray) Phase Spectrum
"""
# Half spectrum size containing DC component
hlfN = (N/2)+1
# Half window size. Two parameters to perform zero-phase windowing technique
hw1 = int(math.floor((w.size+1)/2))
hw2 = int(math.floor(w.size/2))
# Window the input signal
winx = x*w
# Initialize FFT buffer with zeros and perform zero-phase windowing
fftbuffer = np.zeros(N)
fftbuffer[:hw1] = winx[hw2:]
fftbuffer[-hw2:] = winx[:hw2]
# Compute DFT via scipy's FFT implementation
X = fft(fftbuffer)
# Acquire magnitude and phase spectrum
magX = (np.abs(X[:hlfN]))
phsX = (np.angle(X[:hlfN]))
return magX, phsX
def test_phase_randomize():
from brainiak.utils.utils import phase_randomize
import numpy as np
from scipy.fftpack import fft
import math
from scipy.stats import pearsonr
# Generate auto-correlated signals
nv = 2
T = 100
ns = 3
D = np.zeros((nv, T, ns))
for v in range(nv):
for s in range(ns):
D[v, :, s] = np.sin(np.linspace(0, math.pi * 5 * (v + 1), T)) + \
np.sin(np.linspace(0, math.pi * 6 * (s + 1), T))
freq = fft(D, axis=1)
D_pr = phase_randomize(D)
freq_pr = fft(D_pr, axis=1)
p_corr = pearsonr(np.angle(freq).flatten(), np.angle(freq_pr).flatten())[0]
assert np.isclose(abs(freq), abs(freq_pr)).all(), \
"Amplitude spectrum not preserved under phase randomization"
assert abs(p_corr) < 0.03, \
"Phases still correlated after randomization"
def fft_random(n):
for i in range(n):
x = random.normal(0, 0.1, 2000)
y = fft(x)
if(i%30 == 0):
process_percent = int(100 * float(i)/float(n))
current_task.update_state(state='PROGRESS',
meta={'process_percent': process_percent})
return random.random()
def test(tid, n):
for i in range(n):
x = random.normal(0, 0.1, 2000)
y = fft(x)
result = dict()
result['x'] = random.random()
result['y'] = random.randint(100)
return result
# @task_success.connect(sender='celeryapp.tasks.fft_random')
def DFT(x, w, N):
""" Discrete Fourier Transformation(Analysis) of a given real input signal
via an FFT implementation from scipy. Single channel is being supported.
Args:
x : (array) Real time domain input signal
w : (array) Desired windowing function
N : (int) FFT size
Returns:
magX : (2D ndarray) Magnitude Spectrum
phsX : (2D ndarray) Phase Spectrum
"""
# Half spectrum size containing DC component
hlfN = (N/2)+1
# Half window size. Two parameters to perform zero-phase windowing technique
hw1 = int(math.floor((w.size+1)/2))
hw2 = int(math.floor(w.size/2))
# Window the input signal
winx = x*w
# Initialize FFT buffer with zeros and perform zero-phase windowing
fftbuffer = np.zeros(N)
fftbuffer[:hw1] = winx[hw2:]
fftbuffer[-hw2:] = winx[:hw2]
# Compute DFT via scipy's FFT implementation
X = fft(fftbuffer)
# Acquire magnitude and phase spectrum
magX = (np.abs(X[:hlfN]))
phsX = (np.angle(X[:hlfN]))
return magX, phsX
def plotFFT(df):
# Number of samplepoints
N = len(df)
# sample spacing
T = BINSIZE*60.0
x = np.linspace(0.0, N*T, N)
y = df.values #np.sin(50.0 * 2.0*np.pi*x) + 0.5*np.sin(80.0 * 2.0*np.pi*x)
yf = fftpack.fft(y)
xf = np.linspace(0.0, 1.0/(2.0*T), N/2)
fig, ax = plt.subplots()
ax.plot(xf, 2.0/N * np.abs(yf[:N//2]))
plt.savefig(FIG_DIR+'fft.png', bbox_inches='tight')
def frequencies(self):
"""
:return: frequency array
"""
if self.baseband:
return np.fft.rfftfreq(self.data_length*self.PADDING_FACTOR,
self.sampling_time)
else:
return self.center + scipy.fftpack.fftshift( scipy.fftpack.fftfreq(
self.data_length*self.PADDING_FACTOR,
self.sampling_time)) #[self.useful_index()]
def _shift_fft(array, shift_value):
Ndim = array.ndim
dims = array.shape
dtype = array.dtype.kind
if (dtype != 'f'):
raise ValueError('Array must be float')
shifted = array
if (Ndim == 1):
Nx = dims[0]
x_ramp = np.arange(Nx, dtype=array.dtype) - Nx//2
tilt = (2*np.pi/Nx) * (shift_value[0]*x_ramp)
cplx_tilt = np.cos(tilt) + 1j*np.sin(tilt)
cplx_tilt = fft.fftshift(cplx_tilt)
narray = fft.fft(fft.ifft(array) * cplx_tilt)
shifted = narray.real
elif (Ndim == 2):
Nx = dims[0]
Ny = dims[1]
x_ramp = np.outer(np.full(Nx, 1.), np.arange(Ny, dtype=array.dtype)) - Nx//2
y_ramp = np.outer(np.arange(Nx, dtype=array.dtype), np.full(Ny, 1.)) - Ny//2
tilt = (2*np.pi/Nx) * (shift_value[0]*x_ramp+shift_value[1]*y_ramp)
cplx_tilt = np.cos(tilt) + 1j*np.sin(tilt)
cplx_tilt = fft.fftshift(cplx_tilt)
narray = fft.fft2(fft.ifft2(array) * cplx_tilt)
shifted = narray.real
else:
raise ValueError('This function can shift only 1D or 2D arrays')
return shifted
def harvest_get_f0_candidate_from_raw_event(boundary_f0,
fs, y_spectrum, y_length, temporal_positions, f0_floor,
f0_ceil):
filter_length_half = int(np.round(fs / boundary_f0 * 2))
band_pass_filter_base = harvest_nuttall(filter_length_half * 2 + 1)
shifter = np.cos(2 * np.pi * boundary_f0 * np.arange(-filter_length_half, filter_length_half + 1) / float(fs))
band_pass_filter = band_pass_filter_base * shifter
index_bias = filter_length_half
# possible numerical issues if 32 bit
spectrum_low_pass_filter = np.fft.fft(band_pass_filter.astype("float64"), len(y_spectrum))
filtered_signal = np.real(np.fft.ifft(spectrum_low_pass_filter * y_spectrum))
index_bias = filter_length_half + 1
filtered_signal = filtered_signal[index_bias + np.arange(y_length).astype("int32")]
negative_zero_cross = harvest_zero_crossing_engine(filtered_signal, fs)
positive_zero_cross = harvest_zero_crossing_engine(-filtered_signal, fs)
d_filtered_signal = filtered_signal[1:] - filtered_signal[:-1]
peak = harvest_zero_crossing_engine(d_filtered_signal, fs)
dip = harvest_zero_crossing_engine(-d_filtered_signal, fs)
f0_candidate = harvest_get_f0_candidate_contour(negative_zero_cross,
positive_zero_cross, peak, dip, temporal_positions)
f0_candidate[f0_candidate > (boundary_f0 * 1.1)] = 0.
f0_candidate[f0_candidate < (boundary_f0 * .9)] = 0.
f0_candidate[f0_candidate > f0_ceil] = 0.
f0_candidate[f0_candidate < f0_floor] = 0.
return f0_candidate
def harvest(x, fs):
f0_floor = 71
f0_ceil = 800
target_fs = 8000
channels_in_octave = 40.
basic_temporal_positions, temporal_positions = _world_get_temporal_positions(len(x), fs)
adjusted_f0_floor = f0_floor * 0.9
adjusted_f0_ceil = f0_ceil * 1.1
boundary_f0_list = np.arange(1, np.ceil(np.log2(adjusted_f0_ceil / adjusted_f0_floor) * channels_in_octave) + 1) / float(channels_in_octave)
boundary_f0_list = adjusted_f0_floor * 2.0 ** boundary_f0_list
y, actual_fs = harvest_get_downsampled_signal(x, fs, target_fs)
fft_size = 2. ** np.ceil(np.log2(len(y) + np.round(fs / f0_floor * 4) + 1))
y_spectrum = np.fft.fft(y, int(fft_size))
raw_f0_candidates = harvest_get_raw_f0_candidates(
len(basic_temporal_positions),
boundary_f0_list, len(y), basic_temporal_positions, actual_fs,
y_spectrum, f0_floor, f0_ceil)
f0_candidates, number_of_candidates = harvest_detect_official_f0_candidates(raw_f0_candidates)
f0_candidates = harvest_overlap_f0_candidates(f0_candidates, number_of_candidates)
f0_candidates, f0_scores = harvest_refine_candidates(y, actual_fs,
basic_temporal_positions, f0_candidates, f0_floor, f0_ceil)
f0_candidates, f0_scores = harvest_remove_unreliable_candidates(f0_candidates, f0_scores)
connected_f0, vuv = harvest_fix_f0_contour(f0_candidates, f0_scores)
smoothed_f0 = harvest_smooth_f0_contour(connected_f0)
idx = np.minimum(len(smoothed_f0) - 1, np.round(temporal_positions * 1000)).astype("int32")
f0 = smoothed_f0[idx]
vuv = vuv[idx]
f0_candidates = f0_candidates
return temporal_positions, f0, vuv, f0_candidates