def readwav(file):
# wavio.py
# Author: Warren Weckesser
# License: BSD 3-Clause (http://opensource.org/licenses/BSD-3-Clause)
"""
Read a wav file.
Returns the frame rate, sample width (in bytes) and a numpy array
containing the data.
This function does not read compressed wav files.
"""
wav = wave.open(file)
rate = wav.getframerate()
nchannels = wav.getnchannels()
sampwidth = wav.getsampwidth()
nframes = wav.getnframes()
data = wav.readframes(nframes)
wav.close()
array = _wav2array(nchannels, sampwidth, data)
return rate, sampwidth, array
python类read()的实例源码
def extract_mfcc(sound):
(rate,sig) = wav.read(StringIO.StringIO(sound))
mfcc_feat = features.mfcc(sig,rate)
return numpy.asarray(mfcc_feat, dtype='float32')
def load_data(path):
data = []
label_index = np.array([], dtype=int)
label_count = 0
wav_files_count = 0
for root, dirs, files in os.walk(path):
# get all wav files in current dir
wav_files = [file for file in files if file.endswith('.wav')]
data_same_person = []
# extract logfbank features from wav file
for wav_file in wav_files:
(rate, sig) = wav.read(root + "/" + wav_file)
fbank_beats = logfbank(sig, rate, nfilt=40)
# save logfbank features into same person array
data_same_person.append(fbank_beats)
# save all data of same person into the data array
# the length of data array is number of speakers
if wav_files:
wav_files_count += len(wav_files)
data.append(data_same_person)
# return data, np.arange(len(data))
return data
def get_noise(start):
# read audio samples
input_data = read('junk.wav')
audio_in = input_data[1]
samples = len(audio_in)
intvl = (samples-start)/seg
k = start
sum_data = numpy.zeros(seg)
for i in xrange(intvl):
buffer_data = []
for j in xrange(seg):
buffer_data.append(audio_in[k])
k = k+1
cbuffer_out = fft(buffer_data)
for j in xrange(seg):
sq = abs(cbuffer_out[j])**2.0
sum_data[j] = sum_data[j]+sq
for j in xrange(seg):
sum_data[j] = sqrt(sum_data[j]/intvl)
return sum_data
def read(cls, filename):
"""
Read an audio file (only wav is supported).
Parameters
----------
filename: string
Path to the wav file.
"""
sample_rate, samples = wavfile.read(filename)
if samples.dtype==np.dtype('int16'):
samples = samples.astype(_types.float_) / np.iinfo(np.dtype('int16')).min
if len(samples.shape)==1:
samples = samples.reshape((samples.shape[0],1))
instance = cls(samples, sample_rate)
return instance
def load_wav_to_sampler_slot(path, sampler, slot, **kwargs):
sample = sampler.Sample()
freq, snd = wavfile.read(str(path))
if snd.dtype.name == 'int16':
sample.format = sampler.Format.int16
elif snd.dtype.name == 'float32':
sample.format = sampler.Format.float32
else:
raise Exception('Not supported')
if len(snd.shape) == 1:
size, = snd.shape
channels = 1
else:
size, channels = snd.shape
sample.rate = freq
sample.channels = {
1: m.Sampler.Channels.mono,
2: m.Sampler.Channels.stereo,
}[channels]
sample.data = snd.data.tobytes()
for key, value in kwargs.items():
setattr(sample, key, value)
sampler.samples[slot] = sample
return sample
def test_trim_remove_zeros_frames():
fs, x = wavfile.read(example_audio_file())
frame_period = 5
x = x.astype(np.float64)
f0, timeaxis = pyworld.dio(x, fs, frame_period=frame_period)
spectrogram = pyworld.cheaptrick(x, f0, timeaxis, fs)
aperiodicity = pyworld.d4c(x, f0, timeaxis, fs)
for mat in [spectrogram, aperiodicity]:
trimmed = trim_zeros_frames(mat)
assert trimmed.shape[1] == mat.shape[1]
for mat in [spectrogram, aperiodicity]:
trimmed = remove_zeros_frames(mat)
assert trimmed.shape[1] == mat.shape[1]
def make_spectrum(self, filename, use_normalize):
sr, y = wav.read(filename)
if sr != 16000:
raise ValueError('Sampling rate is expected to be 16kHz!')
if y.dtype!='float32':
y = np.float32(y/32767.)
D=librosa.stft(y,n_fft=512,hop_length=256,win_length=512,window=scipy.signal.hamming)
Sxx=np.log10(abs(D)**2)
if use_normalize:
mean = np.mean(Sxx, axis=1).reshape((257,1))
std = np.std(Sxx, axis=1).reshape((257,1))+1e-12
Sxx = (Sxx-mean)/std
slices = []
for i in range(0, Sxx.shape[1]-self.FRAMELENGTH, self.OVERLAP):
slices.append(Sxx[:,i:i+self.FRAMELENGTH])
return np.array(slices)
def __init__(self, filepath="files", is_delta_mode=False, verbose=False):
self.verbose = verbose
self.message = ""
self.filepath = filepath
self.is_delta = is_delta_mode
# Load files
try:
self.NN = pickle.load(open(self.filepath+'/model.pkl','rb'))
# Load user names
userList = open(self.filepath+"/metadata.txt", "r")
self.users = userList.read().split('\n')
userList.close()
except FileNotFoundError:
print("Model and metadata.txt not found.")
self.mlp = MLPClassifier(hidden_layer_sizes=(50, 50, 50), activation = 'logistic')
if self.verbose:
print("Delta Mode enable = ", is_delta_mode)
# Train the network and generate model.pkl file and csv file
def fetch_sample_speech_fruit(n_samples=None):
url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz'
wav_path = "audio.tar.gz"
if not os.path.exists(wav_path):
download(url, wav_path)
tf = tarfile.open(wav_path)
wav_names = [fname for fname in tf.getnames()
if ".wav" in fname.split(os.sep)[-1]]
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
f = tf.extractfile(wav_name)
fs, d = wavfile.read(f)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
return fs, speech
def run_mgc_example():
import matplotlib.pyplot as plt
fs, x = wavfile.read("test16k.wav")
pos = 3000
fftlen = 1024
win = np.blackman(fftlen) / np.sqrt(np.sum(np.blackman(fftlen) ** 2))
xw = x[pos:pos + fftlen] * win
sp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
mgc_order = 20
mgc_alpha = 0.41
mgc_gamma = -0.35
mgc_arr = win2mgc(xw, order=mgc_order, alpha=mgc_alpha, gamma=mgc_gamma, verbose=True)
xwsp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
sp = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fftlen)
plt.plot(xwsp)
plt.plot(20. / np.log(10) * np.real(sp), "r")
plt.xlim(1, len(xwsp))
plt.show()
def fetch_sample_speech_fruit(n_samples=None):
url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz'
wav_path = "audio.tar.gz"
if not os.path.exists(wav_path):
download(url, wav_path)
tf = tarfile.open(wav_path)
wav_names = [fname for fname in tf.getnames()
if ".wav" in fname.split(os.sep)[-1]]
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
f = tf.extractfile(wav_name)
fs, d = wavfile.read(f)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
return fs, speech
def run_mgc_example():
import matplotlib.pyplot as plt
fs, x = wavfile.read("test16k.wav")
pos = 3000
fftlen = 1024
win = np.blackman(fftlen) / np.sqrt(np.sum(np.blackman(fftlen) ** 2))
xw = x[pos:pos + fftlen] * win
sp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
mgc_order = 20
mgc_alpha = 0.41
mgc_gamma = -0.35
mgc_arr = win2mgc(xw, order=mgc_order, alpha=mgc_alpha, gamma=mgc_gamma, verbose=True)
xwsp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
sp = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fftlen)
plt.plot(xwsp)
plt.plot(20. / np.log(10) * np.real(sp), "r")
plt.xlim(1, len(xwsp))
plt.show()
def load_wav_file(fname, smprate=16000):
'''
load a WAV file, then return a numpy float32 vector.
Resample if needed.
The returned array will always have lenght of multiples of FFT_SIZE
to ease preprocessing, this is done via zero padding at the end.
'''
smprate_real, data = wavfile.read(fname)
if smprate_real == smprate:
data = data.astype(FLOATX)
elif (smprate_real % smprate) == 0:
# integer factor downsample
smpfactor = smprate_real // smprate
data = np.pad(
data, [(0, (-len(data)) % smpfactor)], mode='constant')
data = np.reshape(data, [len(data)//smpfactor, smpfactor])
data = np.mean(data.astype(FLOATX), axis=1)
else:
newlen = int(ceil(len(data) * (smprate / smprate_real)))
# FIXME this resample is very slow on prime length
data = scipy.signal.resample(data, newlen).astype(FLOATX)
return data
def get_num_examples(self, wavlists, labellists, num_examples, num_features):
for n,(w, l) in enumerate(zip(wavlists, labellists)):
fs, au = wav.read(w)
# Extract Spectrum of audio inputs
melf = mfcc(au, samplerate = fs, numcep = self.num_features, winlen=0.025, winstep=0.01, nfilt=self.num_features)
#melf = (melf - np.mean(melf))/np.std(melf)
self.mel_freq.append(melf)
melf_target = self.labelprocessing(l)
self.target_label.append(melf_target)
if n == num_examples - 1:
break
if melf.shape[0] <= len(melf_target):
t = w,l
self.length_check.append(t)
# Split transcript into each label
def prepareData(path):
normal_files = os.listdir(path + "/Normal/")
nasal_files = os.listdir(path + "/Nasalized/")
normal_features = np.zeros((1,400))
normal_labels = np.zeros((1,1))
nasal_features = np.zeros((1,400))
nasal_labels = np.zeros((1,1))
for filename in normal_files:
(rate, sig) = wav.read(path + "/Normal/" + filename)
sig = sig[:,0]
sig = preprocess_sample(sig,rate)
features, labels = create_labeled_data(sig, nasal=0)
normal_features = np.append(normal_features,features,axis = 0)
for filename in nasal_files:
(rate, sig) = wav.read(path + "/Nasalized/" + filename)
sig = sig[:,0]
sig = preprocess_sample(sig,rate)
features, labels = create_labeled_data(sig, nasal=1)
nasal_features = np.append(nasal_features,features,axis = 0)
normal_features = normal_features[1:]
nasal_features = nasal_features[1:]
return (normal_features,nasal_features)
def synth_audio(audiofile, impfile, chns, angle, nsfile=None, snrlevel=None, outname=None, outsplit=False):
FreqSamp, audio = wavfile.read(audiofile)
audio = audio.astype(np.float32)/np.amax(np.absolute(audio.astype(np.float32)))
gen_audio = np.zeros((audio.shape[0], chns), dtype=np.float32)
for ch in range(1,chns+1):
impulse = np.fromfile('{}D{:03d}_ch{}.flt'.format(impfile, angle, ch), dtype=np.float32)
gen_audio[:,ch-1] = np.convolve(audio, impulse, mode='same')
gen_audio = add_noise(gen_audio, nsfile=nsfile, snrlevel=snrlevel)
if outname is None:
return FreqSamp, np.transpose(gen_audio)
if outsplit:
for ch in range(chns):
play_data = audiolab.wavwrite(gen_audio[:,ch],'{}_ch{:02d}.wav'.format(outname,ch), fs=FreqSamp, enc='pcm16')
return
else:
play_data = audiolab.wavwrite(gen_audio,'{}.wav'.format(outname), fs=FreqSamp, enc='pcm16')
return
def add_noise(gen_audio, nsfile=None, snrlevel=None):
chns = gen_audio.shape[1]
if not ((nsfile is None) or (nsfile==-1)):
_, noise= wavfile.read(nsfile)
noise = noise[0:gen_audio.shape[0]]
if not (snrlevel is None or snrlevel=='Clean'):
if nsfile is None:
noise = np.random.uniform(-1.0, 1.0, (gen_audio.shape[0],))
if nsfile == -1:
noise = np.random.uniform(-1.0, 1.0, (gen_audio.shape[0], chns))
else:
noise = np.tile(noise[:,np.newaxis], [1, chns])
noise = noise.astype(np.float32)/np.amax(np.absolute(noise.astype(np.float32)))
noise = noise/LA.norm(noise) * LA.norm(gen_audio) / np.power(10,0.05*float(snrlevel))
gen_audio= gen_audio+noise
gen_audio /=np.amax(np.absolute(gen_audio)) #Normalized Audio
return gen_audio
def kayurecord(woodname, duration):
""" Record audio and save to wav file
"""
filename = time_now() + "_" + woodname + ".wav"
container = pyaudio.PyAudio()
stream = container.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print("* start recording...")
data = []
frames = []
for i in range(0, int(RATE / CHUNK * duration)):
data = stream.read(CHUNK)
frames.append(data)
stream.stop_stream()
stream.close()
container.terminate()
print("* done recording!")
kayurecord_save(filename, frames, container)
return filename
def process_data(wav_files, phn_files):
max_step_size = 0
inputs = []
targets = []
for i in tqdm(range(len(wav_files))):
# extract mfcc features from wav
(rate, sig) = wav.read(wav_files[i])
mfcc_feat = mfcc(sig, rate)
fbank_feat = logfbank(sig, rate)
acoustic_features = join_features(mfcc_feat, fbank_feat) # time_stamp x n_features
# extract label from phn
phn_labels = []
with open(phn_files[i], 'rb') as csvfile:
phn_reader = csv.reader(csvfile, delimiter=' ')
for row in phn_reader:
if row[2] == 'q':
continue
phn_labels.append(phoneme_set_39[phoneme_48_39.get(row[2], row[2])] - 1)
inputs.append(acoustic_features)
targets.append(phn_labels)
return lists_batches(inputs, targets)
def process_wav(wav_file):
(rate, sig) = wav.read(wav_file)
mfcc_feat = mfcc(sig, rate)
fbank_feat = logfbank(sig, rate)
acoustic_features = join_features(mfcc_feat, fbank_feat) # time_stamp x n_features
return acoustic_features
def get_mfcc_feat(self):
# creating codebook with all models
mfcc_feats = None
for filename in glob.iglob('../data/voices/*.wav'):
print filename
(rate, sig) = wav.read(filename)
# MFCC Features. Each row corresponds to MFCC for a frame
mfcc_person = mfcc(sig.astype(np.float64), rate)
if mfcc_feats is None:
mfcc_feats = mfcc_person
else:
mfcc_feats = np.concatenate((mfcc_feats, mfcc_person), axis=0)
# Normalize the features
whitened = whiten(mfcc_feats)
self.codebook, labeled_obs = kmeans2(data=whitened, k=3)
def load_audio(filename, b_normalize=True):
"""Load the audiofile at the provided filename using scipy.io.wavfile.
Optionally normalizes the audio to the maximum value.
Parameters
----------
filename : str
File to load.
b_normalize : bool, optional
Normalize to the maximum value.
"""
sr, s = wavfile.read(filename)
if b_normalize:
s = s.astype(np.float32)
s = (s / np.max(np.abs(s)))
s -= np.mean(s)
return s
def load_audio(filename, b_normalize=True):
"""Load the audiofile at the provided filename using scipy.io.wavfile.
Optionally normalizes the audio to the maximum value.
Parameters
----------
filename : str
File to load.
b_normalize : bool, optional
Normalize to the maximum value.
"""
sr, s = wavfile.read(filename)
if b_normalize:
s = s.astype(np.float32)
s = (s / np.max(np.abs(s)))
s -= np.mean(s)
return s
def shuffle_examples(featdir):
'''
shuffle the utterances and put them in feats_shuffled.scp
Args:
featdir: the directory containing the features in feats.scp
'''
#read feats.scp
featsfile = open(featdir + '/feats.scp', 'r')
feats = featsfile.readlines()
#shuffle feats randomly
shuffle(feats)
#wite them to feats_shuffled.scp
feats_shuffledfile = open(featdir + '/feats_shuffled.scp', 'w')
feats_shuffledfile.writelines(feats)
def read_wav(wavfile):
'''
read a wav file formatted by kaldi
Args:
wavfile: a pair containing eiher the filaname or the command to read the
wavfile and a boolean that determines if its a name or a command
'''
if wavfile[1]:
#read the audio file and temporarily copy it to tmp (and duplicate, I
#don't know how to avoid this)
os.system(wavfile[0] + ' tee tmp.wav > duplicate.wav')
#read the created wav file
(rate, utterance) = wav.read('tmp.wav')
#delete the create file
os.remove('tmp.wav')
os.remove('duplicate.wav')
else:
(rate, utterance) = wav.read(wavfile[0])
return rate, utterance
def make_batch_padded(path, num_layers = 14):
rate, data = wavfile.read(path)
#only use the 1st channel
data = data[:, 0]
data_ = normalize(data)
bins, bins_center = mu_law_bins(256)
inputs = np.digitize(data_[0:-1], bins, right=False)
inputs = bins_center[inputs][None, :, None]
#predict sample 1 to end using 0 to end-1
targets = np.digitize(data_[1::], bins, right=False)[None, :]
base = 2 ** num_layers
_, width, _ = inputs.shape
#crop the width to make it multiple of base
width_cropped = int(np.floor(width * 1.0 / base) * base)
inputs_padded = np.pad(inputs[:, 0:width_cropped, :], ((0, 0), (base - 1, 0), (0, 0)), 'constant')
targets_padded = targets[:, 0:width_cropped]
return (inputs_padded, targets_padded)
def test_synthesis_from_codeap(self):
path = dirpath + '/data/test16000.wav'
fs, x = wavfile.read(path)
af = FeatureExtractor(analyzer='world', fs=fs, shiftms=5)
f0, spc, ap = af.analyze(x)
codeap = af.codeap()
assert len(np.nonzero(f0)[0]) > 0
assert spc.shape == ap.shape
assert pyworld.get_num_aperiodicities(fs) == codeap.shape[-1]
ap = pyworld.decode_aperiodicity(codeap, fs, 1024)
synth = Synthesizer(fs=fs, fftl=1024, shiftms=5)
wav = synth.synthesis_spc(f0, spc, ap)
nun_check(wav)
def test_high_frequency_completion(self):
path = dirpath + '/data/test16000.wav'
fs, x = wavfile.read(path)
f0rate = 0.5
shifter = Shifter(fs, f0rate=f0rate)
mod_x = shifter.f0transform(x, completion=False)
mod_xc = shifter.f0transform(x, completion=True)
assert len(mod_x) == len(mod_xc)
N = 512
fl = int(fs * 25 / 1000)
win = np.hanning(fl)
sts = [1000, 5000, 10000, 20000]
for st in sts:
# confirm w/o completion
f_mod_x = fft(mod_x[st: st + fl] / 2**16 * win)
amp_mod_x = 20.0 * np.log10(np.abs(f_mod_x))
# confirm w/ completion
f_mod_xc = fft(mod_xc[st: st + fl] / 2**16 * win)
amp_mod_xc = 20.0 * np.log10(np.abs(f_mod_xc))
assert np.mean(amp_mod_x[N // 4:] < np.mean(amp_mod_xc[N // 4:]))
def load_wav_chunks(filenames):
num_files = len(filenames)
max_chunks = 100000
all_chunks = np.zeros([max_chunks, chunk_size])
total_chunks = 0
for file_idx in range(num_files):
filename = filenames[file_idx]
print("[" + str(file_idx).zfill(3) + "]: " + filename)
rate, data = wavfile.read(filename)
data = np.sum(data, axis=1)
print(data.shape)
data = data.astype(np.float32) * (1.0 / 32768.0)
chunks = create_chunks(data)
num_chunks = len(chunks)
all_chunks[total_chunks:total_chunks+num_chunks] = np.array(chunks)
total_chunks += num_chunks
all_chunks = all_chunks[0:total_chunks]
return all_chunks