def __init__(self, win_len=0.025, win_step=0.01,
num_filt=40, nfft=512, low_freq=20, high_freq=7800,
pre_emph=0.97, win_fun=signal.hamming, **kwargs):
super(FBank, self).__init__(**kwargs)
if high_freq > self.fs / 2:
raise ValueError("high_freq must be less or equal than fs/2")
self.win_len = win_len
self.win_step = win_step
self.num_filt = num_filt
self.nfft = nfft
self.low_freq = low_freq
self.high_freq = high_freq or self.fs / 2
self.pre_emph = pre_emph
self.win_fun = win_fun
self._filterbanks = self._get_filterbanks()
self._num_feats = self.num_filt
python类hamming()的实例源码
def single_spectrogram(inseq,fs,wlen,h,imag=False):
"""
imag: Return Imaginary Data of the STFT on True
"""
NFFT = int(2**(np.ceil(np.log2(wlen))))
K = np.sum(hamming(wlen, False))/wlen
raw_data = inseq.astype('float32')
raw_data = raw_data/np.amax(np.absolute(raw_data))
stft_data,_,_ = STFT(raw_data,wlen,h,NFFT,fs)
s = np.absolute(stft_data)/wlen/K;
if np.fmod(NFFT,2):
s[1:,:] *=2
else:
s[1:-2] *=2
real_data = np.transpose(20*np.log10(s + 10**-6)).astype(np.float32)
if imag:
imag_data = np.angle(stft_data).astype(np.float32)
return real_data,imag_data
return real_data
def GLA(wsz, hop, N=4096):
""" LSEE-MSTFT algorithm for computing the synthesis window used in
inverse STFT method below.
Args:
wsz : (int) Synthesis window size
hop : (int) Hop size
N : (int) DFT Size
Returns :
symw: (array) Synthesised windowing function
References :
[1] Daniel W. Griffin and Jae S. Lim, ``Signal estimation from modified short-time
Fourier transform,'' IEEE Transactions on Acoustics, Speech and Signal Processing,
vol. 32, no. 2, pp. 236-243, Apr 1984.
"""
synw = hamming(wsz)/np.sqrt(N)
synwProd = synw ** 2.
synwProd.shape = (wsz, 1)
redundancy = wsz/hop
env = np.zeros((wsz, 1))
for k in xrange(-redundancy, redundancy + 1):
envInd = (hop*k)
winInd = np.arange(1, wsz+1)
envInd += winInd
valid = np.where((envInd > 0) & (envInd <= wsz))
envInd = envInd[valid] - 1
winInd = winInd[valid] - 1
env[envInd] += synwProd[winInd]
synw = synw/env[:, 0]
return synw
def amplitude_stream(data, sr, fftn, step, lowcut, highcut):
'''returns an iterator with the start time in seconds
and threshold value for each chunk'''
from scipy.signal import hamming
all_fft_freqs = np.fft.fftfreq(fftn, sr** -1)
fft_freqs = (all_fft_freqs >= lowcut) & (all_fft_freqs <= highcut)
window = hamming(fftn)
data = data.ravel()
for i in range(0, len(data) - fftn, step):
x = data[i:i + fftn] * window
fft = np.fft.fft(x, fftn)[fft_freqs]
time = (i + fftn / 2) / sr
yield time, np.mean(np.log(np.abs(fft)))
def __init__(self):
self.outputNames=("OUTPUT",) # one output terminal named "output"
self.outputTypes=("prim_int",) # the type is primitive int.
self.data_array = []
self.prefix = './temp'
self.wlen = 512
self.K = np.sum(hamming(self.wlen, False))/self.wlen
def wav_to_image(filename, wlen, mindata, maxdata, save=False, name_save=None, ):
h = wlen/4
K = np.sum(hamming(wlen, False))/wlen
nfft = int(2**(np.ceil(np.log2(wlen))))
Fs, data_seq = wavfile.read(filename)
raw_data = data_seq.astype('float32')
max_dt = np.amax(np.absolute(raw_data))
raw_data = raw_data/max_dt
stft_data,_,_ = STFT(raw_data,wlen,h,nfft,Fs)
s = abs(stft_data)/wlen/K;
if np.fmod(nfft,2):
s[1:,:] *=2
else:
s[1:-2] *=2
data_temp = 20*np.log10(s + 10**-6)
outdata = data_temp.transpose()
"""Scaling"""
mindata = np.amin(outdata, axis=0, keepdims = True)
maxdata = np.amax(outdata, axis=0, keepdims = True)
outdata -=mindata
outdata /=(maxdata-mindata)
outdata *=0.8
outdata +=0.1
figmin = np.zeros((5,outdata.shape[1]))
figmax = np.ones((5,outdata.shape[1]))
outdata = np.concatenate((outdata,figmin,figmax), axis=0)
dpi = 96
a = float(outdata.shape[0])/dpi
b = float(outdata.shape[1])/dpi
f = plt.figure(figsize=(b,a), dpi=dpi)
f.figimage(outdata)
if save:
f.savefig(name_save, dpi=f.dpi)
return f
def ISTFT(data, h, nfft, fs):
# function: [x, t] = istft(stft, h, nfft, fs)
# stft - STFT matrix (only unique points, time across columns, freq across rows)
# h - hop size
# nfft - number of FFT points
# fs - sampling frequency, Hz
# x - signal in the time domain
# t - time vector, s
# estimate the length of the signal
coln = data.shape[1]
xlen = nfft + (coln-1)*h
x = np.zeros((xlen,))
# form a periodic hamming window
win = hamming(nfft, False)
# perform IFFT and weighted-OLA
if np.fmod(nfft,2):
lst_idx = -1
else:
lst_idx = -2
for b in range (0, h*(coln-1),h):
# extract FFT points
X = data[:,1+b/h]
X = np.concatenate((X, np.conjugate(X[lst_idx:0:-1])))
# IFFT
xprim = np.real(np.fft.ifft(X))
# weighted-OLA
x[b:b+nfft] = x[b:b+nfft] + np.transpose(xprim*win)
W0 = np.sum(win*win)
x *= h/W0
# calculate the time vector
actxlen = x.shape[0]
t = np.arange(0,actxlen-1,dtype=np.float32)/fs
return x, t
def __init__(self):
self.outputNames=("OUTPUT",) # one output terminal named "output"
self.outputTypes=("prim_int",) # the type is primitive int.
self.data_array = []
self.prefix = './temp'
self.wlen = 512
self.K = np.sum(hamming(self.wlen, False))/self.wlen
def __init__(self):
self.outputNames=("OUTPUT",) # one output terminal named "output"
self.outputTypes=("prim_int",) # the type is primitive int.
self.data_array = []
self.prefix = './temp'
self.wlen = 512
self.K = np.sum(hamming(self.wlen, False))/self.wlen
def __init__(self):
self.outputNames=("OUTPUT",) # one output terminal named "output"
self.outputTypes=("prim_int",) # the type is primitive int.
self.data_array = []
self.prefix = './temp'
self.wlen = 512
self.K = np.sum(hamming(self.wlen, False))/self.wlen
def GLA(wsz, hop):
""" LSEE-MSTFT algorithm for computing the synthesis window used in
inverse STFT method below.
Args:
wsz : (int) Synthesis Window size
hop : (int) Hop size
Returns :
symw: (array) Synthesised time-domain real signal.
References :
[1] Daniel W. Griffin and Jae S. Lim, ``Signal estimation from modified short-time
Fourier transform,'' IEEE Transactions on Acoustics, Speech and Signal Processing,
vol. 32, no. 2, pp. 236-243, Apr 1984.
"""
synw = hamming(wsz)/np.sum(hamming(wsz))
synwProd = synw ** 2.
synwProd.shape = (wsz, 1)
redundancy = wsz/hop
env = np.zeros((wsz, 1))
for k in xrange(-redundancy, redundancy + 1):
envInd = (hop*k)
winInd = np.arange(1, wsz+1)
envInd += winInd
valid = np.where((envInd > 0) & (envInd <= wsz))
envInd = envInd[valid] - 1
winInd = winInd[valid] - 1
env[envInd] += synwProd[winInd]
synw = synw/env[:, 0]
return synw
def get_ft_windows():
""" Retrieve the available windows to be applied on signal data before FT.
@return: dict with keys being the window name and items being again a dict
containing the actual function and the normalization factor to
calculate correctly the amplitude spectrum in the Fourier Transform
To find out the amplitude normalization factor check either the scipy
implementation on
https://github.com/scipy/scipy/blob/v0.15.1/scipy/signal/windows.py#L336
or just perform a sum of the window (oscillating parts of the window should
be averaged out and constant offset factor will remain):
MM=1000000 # choose a big number
print(sum(signal.hanning(MM))/MM)
"""
win = {'none': {'func': np.ones, 'ampl_norm': 1.0},
'hamming': {'func': signal.hamming, 'ampl_norm': 1.0/0.54},
'hann': {'func': signal.hann, 'ampl_norm': 1.0/0.5},
'blackman': {'func': signal.blackman, 'ampl_norm': 1.0/0.42},
'triang': {'func': signal.triang, 'ampl_norm': 1.0/0.5},
'flattop': {'func': signal.flattop, 'ampl_norm': 1.0/0.2156},
'bartlett': {'func': signal.bartlett, 'ampl_norm': 1.0/0.5},
'parzen': {'func': signal.parzen, 'ampl_norm': 1.0/0.375},
'bohman': {'func': signal.bohman, 'ampl_norm': 1.0/0.4052847},
'blackmanharris': {'func': signal.blackmanharris, 'ampl_norm': 1.0/0.35875},
'nuttall': {'func': signal.nuttall, 'ampl_norm': 1.0/0.3635819},
'barthann': {'func': signal.barthann, 'ampl_norm': 1.0/0.5}}
return win
def convert_input(channel, annotation):
""" Into output """
# Remove non-beat annotations
beats = beat_annotations(annotation)
# Create dirac-comb signal
dirac = np.zeros_like(channel)
dirac[beats] = 1.0
# Use hamming window as a bell-curve filter
width = 36
filter = ss.hamming(width)
gauss = np.convolve(filter, dirac, mode = 'same')
return dirac, gauss
def CQT_fast(x,fs,bins,fmin,fmax,M):
threshold = 0.0054 #for Hamming window
K = int(bins*np.ceil(np.log2(fmax/fmin)))
Q = 1/(2**(1/bins)-1)
nfft = np.int32(nearestPow2(np.ceil(Q*fs/fmin)))
tempKernel = np.zeros(nfft, dtype = np.complex)
specKernel = np.zeros(nfft, dtype = np.complex)
sparKernel = []
#create sparse Kernel
for k in range(K-1,-1,-1):
fk = (2**(k/bins))*fmin
N = np.int32(np.round((Q*fs)/fk))
tempKernel[:N] = hamming(N)/N * np.exp(-2*np.pi*1j*Q*np.arange(N)/N)
specKernel = fft(tempKernel)
specKernel[np.where(np.abs(specKernel) <= threshold)] = 0
if k == K-1:
sparKernel = specKernel
else:
sparKernel = np.vstack((specKernel, sparKernel))
sparKernel = np.transpose(np.conjugate(sparKernel))/nfft
ft = fft(x,nfft)
cqt = np.dot(ft, sparKernel)
ft = fft(x,nfft*(2**M))
#calculate harmonic power spectrum
#harm_pow = HPS(ft,M)
#cqt = np.dot(harm_pow, sparKernel)
return cqt
def CQT_slow(x, fs, bins, fmin, fmax):
K = int(bins*np.ceil(np.log2(fmax/fmin)))
Q = 1/(2**(1/bins)-1)
cqt = np.zeros(K, dtype = np.complex)
for k in range(K):
fk = (2**(k/bins))*fmin
N = int(np.round(Q*fs/fk))
arr = -2*np.pi*1j*Q*np.arange(N)/N
cqt[k] = np.dot(x[:N], np.transpose(hamming(N) * np.exp(arr)))/N
return cqt
def windowing(input):
"""
Applies hamming window to the input frames.
Args:
input: array of speech samples [N x M] where N is the number of frames and
M the samples per frame
Output:
array of windoed speech samples [N x M]
Note (you can use the function hamming from scipy.signal, include the sym=0 option
if you want to get the same results as in the example)
"""
N, M = input.shape
window = ss.hamming(M, sym=False)
return (input * window)
def mfcc(s,fs, nfiltbank):
#divide into segments of 25 ms with overlap of 10ms
nSamples = np.int32(0.025*fs)
overlap = np.int32(0.01*fs)
nFrames = np.int32(np.ceil(len(s)/(nSamples-overlap)))
#zero padding to make signal length long enough to have nFrames
padding = ((nSamples-overlap)*nFrames) - len(s)
if padding > 0:
signal = np.append(s, np.zeros(padding))
else:
signal = s
segment = np.empty((nSamples, nFrames))
start = 0
for i in range(nFrames):
segment[:,i] = signal[start:start+nSamples]
start = (nSamples-overlap)*i
#compute periodogram
nfft = 512
periodogram = np.empty((nFrames,nfft/2 + 1))
for i in range(nFrames):
x = segment[:,i] * hamming(nSamples)
spectrum = fftshift(fft(x,nfft))
periodogram[i,:] = abs(spectrum[nfft/2-1:])/nSamples
#calculating mfccs
fbank = mel_filterbank(nfft, nfiltbank, fs)
#nfiltbank MFCCs for each frame
mel_coeff = np.empty((nfiltbank,nFrames))
for i in range(nfiltbank):
for k in range(nFrames):
mel_coeff[i,k] = np.sum(periodogram[k,:]*fbank[:,i])
mel_coeff = np.log10(mel_coeff)
mel_coeff = dct(mel_coeff)
#exclude 0th order coefficient (much larger than others)
mel_coeff[0,:]= np.zeros(nFrames)
return mel_coeff
def initLanczos(self, filtOrder):
self.filtOrder = filtOrder
if self.filtOrder % 2 != 0:
raise Exception('Invalid filtOrder: ' + str(self.filtOrder) +
' Must be an even integer.')
radius = self.filtOrder // 2
win = np.sinc(np.linspace(-radius, radius, self.filtOrder+1) / float(radius)) # lanczos
#win = spsig.hamming(self.filtOrder+1) # sinc-hamming
# this should be automated somehow XXX - idfah
if self.filtOrder <= 6:
cutoff = 2*0.570
elif self.filtOrder <= 8:
cutoff = 2*0.676
elif self.filtOrder <= 12:
cutoff = 2*0.781
elif self.filtOrder <= 16:
cutoff = 2*0.836
elif self.filtOrder <= 32:
cutoff = 2*0.918
elif self.filtOrder <= 64:
cutoff = 2*0.959
# need to fix for multiple pool sizes XXX - idfah
cutoff /= float(self.poolSize)
taps = cutoff * np.linspace(-radius, radius, self.filtOrder+1, dtype=self.dtype)
impulseResponse = cutoff * np.sinc(taps) * win
self.filters = []
nReadoutLayers = 1 if self.nHidden is None else 2
for ni, no in self.layerDims[:-nReadoutLayers]:
noEmb = no*(self.filtOrder+1) # no outs after filter embedding
filtMat = np.zeros(noEmb*2, dtype=self.dtype)
filtMat[noEmb-1::-no] = impulseResponse
# filters strided for embedding
sz = filtMat.itemsize
filtMat = npst.as_strided(filtMat, (no,noEmb), strides=(sz,sz))[::-1].T
self.filters.append(filtMat.copy())
def STFT(x, wlen, h, nfft, fs):
########################################################
# Short-Time Fourier Transform %
# with MATLAB Implementation %
# For Python %
# Copier: Nelson Yalta 11/03/15 %
########################################################
# function: [stft, f, t] = stft(x, wlen, h, nfft, fs)
# x - signal in the time domain
# wlen - length of the hamming window
# h - hop size
# nfft - number of FFT points
# fs - sampling frequency, Hz
# f - frequency vector, Hz
# t - time vector, s
# stft - STFT matrix (only unique points, time across columns, freq across rows)
# represent x as column-vector if it is not
if (len(x.shape) > 1) and (x.shape[1] > 1):
x = x.transpose()
# length of the signal
xlen = x.shape[0]
# form a periodic hamming window
win = hamming(wlen, False)
# form the stft matrix
rown = int(np.ceil((1.0+nfft)/2))
coln = int(np.fix((xlen-wlen)/h) + 1)
short_tft = np.zeros((rown,coln)).astype('complex64')
# initialize the indexes
indx = 0
col = 0
# perform STFT
while (indx + wlen <= xlen):
# windowing
xw =x[indx:indx+wlen]*win
# FFT
X = np.fft.fft(xw,nfft)
# update the stft matrix
short_tft[:,col] = X[0:rown]
# update the indexes
indx += h
col += 1
# calculate the time and frequency vectors
t = np.linspace(wlen/2,wlen/2+(coln-1)*h,coln)/fs
f = np.arange(0,rown,dtype= np.float32)*fs/nfft
return short_tft, f, t
def show_objective():
""" For the model """
# Choose a record
records = dm.get_records()
path = records[17]
record = wf.rdsamp(path)
ann = wf.rdann(path, 'atr')
chid = 0
print 'Channel:', record.signame[chid]
cha = record.p_signals[:, chid]
# These were found manually
sta = 184000
end = sta + 1000
times = np.arange(end-sta, dtype = 'float')
times /= record.fs
# Extract the annotations for that fragment
where = (sta < ann.annsamp) & (ann.annsamp < end)
samples = ann.annsamp[where] - sta
print samples
# Prepare dirac-comb type of labels
qrs_values = np.zeros_like(times)
qrs_values[samples] = 1
# Prepare gaussian-comb type of labels
kernel = ss.hamming(36)
qrs_gauss = np.convolve(kernel,
qrs_values,
mode = 'same')
# Make the plots
fig = plt.figure()
ax1 = fig.add_subplot(3,1,1)
ax1.plot(times, cha[sta : end])
ax2 = fig.add_subplot(3,1,2, sharex=ax1)
ax2.plot(times,
qrs_values,
'C1',
lw = 4,
alpha = 0.888)
ax3 = fig.add_subplot(3,1,3, sharex=ax1)
ax3.plot(times,
qrs_gauss,
'C3',
lw = 4,
alpha = 0.888)
plt.setp(ax1.get_xticklabels(), visible=False)
plt.setp(ax2.get_xticklabels(), visible=False)
plt.xlabel('Time [s]')
plt.xlim([0, 2.5])
plt.show()
def show_objective_part2():
""" For the model """
# Choose a record
records = dm.get_records()
path = records[13]
record = wf.rdsamp(path)
ann = wf.rdann(path, 'atr')
chid = 0
print 'File:', path
print 'Channel:', record.signame[chid]
cha = record.p_signals[:, chid]
# These were found manually
sta = 184000
end = sta + 1000
times = np.arange(end-sta, dtype = 'float')
times /= record.fs
# Extract the annotations for that fragment
where = (sta < ann.annsamp) & (ann.annsamp < end)
samples = ann.annsamp[where] - sta
print samples
# Prepare dirac-comb type of labels
qrs_values = np.zeros_like(times)
qrs_values[samples] = 1
# Prepare gaussian-comb type of labels
kernel = ss.hamming(36)
qrs_gauss = np.convolve(kernel,
qrs_values,
mode = 'same')
# Make the plots
fig = plt.figure()
ax1 = fig.add_subplot(2,1,1)
ax1.plot(times, cha[sta : end])
ax1.set_title('Input', loc = 'left')
ax2 = fig.add_subplot(2,1,2, sharex=ax1)
ax2.plot(times,
qrs_gauss,
'C3',
lw = 4,
alpha = 0.888)
ax2.set_title('Output', loc = 'left')
ax1.grid()
ax2.grid()
plt.setp(ax1.get_xticklabels(), visible=False)
plt.xlabel('Time [s]')
plt.xlim([0, 2.5])
plt.show()