def harvest_get_downsampled_signal(x, fs, target_fs):
decimation_ratio = np.round(fs / target_fs)
offset = np.ceil(140. / decimation_ratio) * decimation_ratio
start_pad = x[0] * np.ones(int(offset), dtype=np.float32)
end_pad = x[-1] * np.ones(int(offset), dtype=np.float32)
x = np.concatenate((start_pad, x, end_pad), axis=0)
if fs < target_fs:
raise ValueError("CASE NOT HANDLED IN harvest_get_downsampled_signal")
else:
try:
y0 = sg.decimate(x, int(decimation_ratio), 3, zero_phase=True)
except:
y0 = sg.decimate(x, int(decimation_ratio), 3)
actual_fs = fs / decimation_ratio
y = y0[int(offset / decimation_ratio):-int(offset / decimation_ratio)]
y = y - np.mean(y)
return y, actual_fs
python类decimate()的实例源码
def harvest_get_downsampled_signal(x, fs, target_fs):
decimation_ratio = np.round(fs / target_fs)
offset = np.ceil(140. / decimation_ratio) * decimation_ratio
start_pad = x[0] * np.ones(int(offset), dtype=np.float32)
end_pad = x[-1] * np.ones(int(offset), dtype=np.float32)
x = np.concatenate((start_pad, x, end_pad), axis=0)
if fs < target_fs:
raise ValueError("CASE NOT HANDLED IN harvest_get_downsampled_signal")
else:
try:
y0 = sg.decimate(x, int(decimation_ratio), 3, zero_phase=True)
except:
y0 = sg.decimate(x, int(decimation_ratio), 3)
actual_fs = fs / decimation_ratio
y = y0[int(offset / decimation_ratio):-int(offset / decimation_ratio)]
y = y - np.mean(y)
return y, actual_fs
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_ono(n_samples=None):
datapath = os.path.join("ono_wav", "*wav")
wav_names = glob.glob(datapath)
wav_names = [w for w in wav_names
if "EKENWAY" in w]
wav_names = [w for w in wav_names
if "PAIN" in w]
speech = []
logger.info("Loading speech files...")
for wav_name in wav_names[:n_samples]:
fs, bitw, d = readwav(wav_name)
# 24 bit but only 16 used???
d = d.astype('float32') / (2 ** 15)
d = sg.decimate(d, 6, ftype="fir")[::6]
# decimate to 8k
fs = 8000
speech.append(d)
return fs, speech, wav_names
def fetch_sample_speech_walla(n_samples=None):
datapath = os.path.join("walla_wav", "*wav")
names = glob.glob(datapath)
speech = []
wav_names = []
logger.info("Loading speech files...")
for name in names[:n_samples]:
fs, bitw, d = readwav(name)
d = d.astype('float32') / (2 ** 15)
inds = np.arange(0, len(d), 16000)
for i, j in zip(inds[:-1], inds[1:]):
dij = d[i:j]
dij = sg.decimate(dij, 2, ftype="iir")[::2]
# decimate to 8k
fs = 8000
speech.append(dij)
wav_names.append(name)
if len(speech) > 200:
break
return fs, speech, wav_names
def wavwrite(srcfile, fs, training):
try:
mat = io.loadmat(srcfile)
except ValueError:
print('Could not load %s' % srcfile)
return
dat = mat['dataStruct'][0, 0][0]
if ds_factor != 1:
dat = signal.decimate(dat, ds_factor, axis=0, zero_phase=True)
mn = dat.min()
mx = dat.max()
mx = float(max(abs(mx), abs(mn)))
if training and mx == 0:
print('skipping %s' % srcfile)
return
if mx != 0:
dat *= 0x7FFF / mx
dat = np.int16(dat)
winsize = win_dur * 60 * fs
stride = 60 * fs
for elec in range(16):
aud = dat[:, elec]
for win in range(nwin):
dstfile = srcfile.replace('mat', str(win) + '.' + str(elec) + '.wav')
beg = win * stride
end = beg + winsize
clip = aud[beg:end]
audiolab.wavwrite(clip, dstfile, fs=fs, enc='pcm16')
def downSample(data, sampleRate = 20000, dsType = 'mean'):
""" Function that downsamples data.
:param data: list including syllables with sample data
:param sampleRate: desired samplerate
:param dsType: Type of interpolating used for downsampling.
Can be mean or IIR, which uses an order 8 Chebyshev type 1 filter (default = mean)
:returns syllables: downsampled data, in same format as input data
"""
syllables = []
for syllable in data:
samples = []
for sample in syllable:
SR = int(np.round(sample[1]/float(sampleRate)))
if dsType == 'mean':
pad_size = int(math.ceil(float(sample[0].size)/SR)*SR - sample[0].size)
s_padded = np.append(sample[0], np.zeros(pad_size)*np.NaN)
s_new = sp.nanmean(s_padded.reshape(-1,SR), axis=1)
elif dsType == 'IIR':
s_new = ss.decimate(sample[0],SR)
samples.append([s_new, sampleRate])
syllables.append(samples)
return syllables
#%%
def downsampleBy(v, factor):
order = 8 # default value for decimate() anyway
minSamples = 5
if len(v) < (minSamples * factor):
factor = int(len(v) / minSamples)
v = signal.decimate(v, factor, n=order)
if len(v) >= (order + minSamples):
return v[(order - 1):]
return v
def downsampleMat(A, rowsBy=1, colsBy=1):
if len(A.shape) == 1:
return signal.decimate(A, rowsBy, n=(rowsBy-1))
if rowsBy != 1:
A = signal.decimate(A, rowsBy, n=(rowsBy-1), axis=0)
if colsBy != 1:
A = signal.decimate(A, rowsBy, n=(colsBy-1), axis=1)
return A
# A = A.reshape(-1, 1)
# newLen = int(A.shape[0] / float(rowsBy))
# resampled = imresize(A, (newLen, 1))
# return resampled.flatten()
# newShape = A.shape / np.array([rowsBy, colsBy], dtype=np.float)
# newShape = newShape.astype(np.int) # round to int
# return imresize(A, newShape)
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m