def transform(self, y, sr):
'''Transform an audio signal
Parameters
----------
y : np.ndarray
The audio signal
sr : number > 0
The native sampling rate of y
Returns
-------
dict
Data dictionary containing features extracted from y
See Also
--------
transform_audio
'''
if sr != self.sr:
y = resample(y, sr, self.sr)
return self.merge([self.transform_audio(y)])
python类resample()的实例源码
def wav_data_to_samples(wav_data, sample_rate):
"""Read PCM-formatted WAV data and return a NumPy array of samples.
Uses scipy to read and librosa to process WAV data. Audio will be converted to
mono if necessary.
Args:
wav_data: WAV audio data to read.
sample_rate: The number of samples per second at which the audio will be
returned. Resampling will be performed if necessary.
Returns:
A numpy array of audio samples, single-channel (mono) and sampled at the
specified rate, in float32 format.
Raises:
AudioIOReadException: If scipy is unable to read the WAV data.
AudioIOException: If audio processing fails.
"""
try:
# Read the wav file, converting sample rate & number of channels.
native_sr, y = scipy.io.wavfile.read(six.BytesIO(wav_data))
except Exception as e: # pylint: disable=broad-except
raise AudioIOReadException(e)
if y.dtype != np.int16:
raise AudioIOException('WAV file not 16-bit PCM, unsupported')
try:
# Convert to float, mono, and the desired sample rate.
y = int16_samples_to_float32(y)
if y.ndim == 2 and y.shape[1] == 2:
y = y.T
y = librosa.to_mono(y)
if native_sr != sample_rate:
y = librosa.resample(y, native_sr, sample_rate)
except Exception as e: # pylint: disable=broad-except
raise AudioIOException(e)
return y
def load_data(data_dict, file_list):
for f in file_list:
data, samplerate = sf.read(f)
print(f)
try:
assert samplerate == 16000 and len(data.shape) == 1
except AssertionError:
data = librosa.resample(data, samplerate, SR)
data_dict.update({f.split('/')[-1][:-4]:data})
def __call__(self, y):
"""Resample a time series.
Parameters
----------
y : np.ndarray [shape=(n,) or shape=(2, n)]
audio time series. Can be mono or stereo.
Returns
-------
y_hat : np.ndarray [shape=(n * target_sr / orig_sr,)]
"""
return librosa.resample(y, **self.__dict__)
# Feature
def process_music_data(data_in, is_fft, is_energy, n_output_bins, n_fft, is_visual):
# length is len(data_in)/4
data_np = np.fromstring(data_in, 'Float32')
# visualizer
if is_visual:
visualizer(data_np)
# energy
if is_energy:
energy = np.abs(data_np) ** 2
energy = energy.sum()
energy *= 2**5
energy_output = energy.astype(np.uint16)
else:
energy_output = np.zeros(2).astype(np.uint16)
# fft
if is_fft:
global sample_rate
# down-sample by 4, with filtering, energy not scaled
data_np = librosa.resample(data_np,
sample_rate,
sample_rate/4,
res_type='kaiser_fast')
# short time fft over n_fft samples
fft_data = librosa.stft(data_np, n_fft,
hop_length=n_fft,
center=False)
fft_data_mag = np.abs(fft_data[0:n_fft//2]) ** 2
# magnitude scaling
fft_data_mag *= 2**3
fft_output = get_output_fft_bins(fft_data_mag, n_output_bins)
fft_output = fft_output.astype(np.uint8)
else:
fft_output = np.zeros(n_output_bins).astype(np.uint8)
return fft_output, energy_output
def load_audio(path, target_sr=16000):
data, orig_sr = sf.read(path)
if orig_sr != target_sr:
return librosa.resample(data, orig_sr, target_sr)
return data
# def _update_progress(progress):
# print("\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50),
# progress * 100), end="")
#
#
# def create_manifest(data_path, manifest_path, ordered=True):
# file_paths = []
# wav_files = [os.path.join(dirpath, f)
# for dirpath, dirnames, files in os.walk(data_path)
# for f in fnmatch.filter(files, '*.wav')]
# size = len(wav_files)
# counter = 0
# for file_path in wav_files:
# file_paths.append(file_path.strip())
# counter += 1
# _update_progress(counter / float(size))
# print('\n')
# if ordered:
# _order_files(file_paths)
# counter = 0
# with io.FileIO(manifest_path, "w") as file:
# for wav_path in file_paths:
# transcript_path = wav_path.replace(
# '/wav/', '/txt/').replace('.wav', '.txt')
# sample = os.path.abspath(wav_path) + ',' + \
# os.path.abspath(transcript_path) + '\n'
# file.write(sample)
# counter += 1
# _update_progress(counter / float(size))
# print('\n')
#
#
# def _order_files(file_paths):
# print("Sorting files by length...")
#
# def func(element):
# output = subprocess.check_output(
# ['soxi -D %s' % element.strip()],
# shell=True
# )
# return float(output)
#
# file_paths.sort(key=func)
def process_music_data(data_in, is_fft, is_energy, n_output_bins, n_fft, is_visual):
# length is len(data_in)/4
data_np = np.fromstring(data_in, 'Float32')
# visualizer
if is_visual:
visualizer(data_np)
# energy
if is_energy:
energy = np.abs(data_np) ** 2
energy = energy.sum()
energy *= 2**5
energy_output = energy.astype(np.uint16)
else:
energy_output = np.zeros(2).astype(np.uint16)
# fft
if is_fft:
global sample_rate
# down-sample by 4, with filtering, energy not scaled
data_np = librosa.resample(data_np,
sample_rate,
sample_rate/4,
res_type='kaiser_fast')
# short time fft over n_fft samples
fft_data = librosa.stft(data_np, n_fft,
hop_length=n_fft,
center=False)
fft_data_mag = np.abs(fft_data[0:n_fft//2]) ** 2
# magnitude scaling
fft_data_mag *= 2**3
fft_output = get_output_fft_bins(fft_data_mag, n_output_bins)
fft_output = fft_output.astype(np.uint8)
else:
fft_output = np.zeros(n_output_bins).astype(np.uint8)
return fft_output, energy_output