def __init__(self, clocks):
super(PygameStretchTIA_Sound, self).__init__(clocks)
# Flag to indicate if samples should be stretched in frequency, or more outputs generated.
self._maintain_pitch = True
self._wav_output = [wave.open('pytari_stretch_chan0.wav', 'w'),wave.open('pytari_stretch_chan1.wav', 'w')]
self._wav_output[0].setparams((1, 1, self.SAMPLERATE, 0, 'NONE', 'not compressed'))
self._wav_output[1].setparams((1, 1, self.SAMPLERATE, 0, 'NONE', 'not compressed'))
self._sound_chunk_size = 1024*4
self.openSound()
self._test_accumulated_sound = self._sound_chunk_size * 2
# Hold 'stretch' state for each channel.
self._stretcher = tiasound.Stretch()
self._stretched = [[],[]]
self._last_update_time = self.clocks.system_clock
python类open()的实例源码
def audio(tag, tensor, sample_rate=44100):
tensor = tensor.squeeze()
assert tensor.ndim==1, 'input tensor should be 1 dimensional.'
tensor_list = [int(32767.0*x) for x in tensor]
import io
import wave
import struct
fio = io.BytesIO()
Wave_write = wave.open(fio, 'wb')
Wave_write.setnchannels(1)
Wave_write.setsampwidth(2)
Wave_write.setframerate(sample_rate)
tensor_enc = b''
for v in tensor_list:
tensor_enc += struct.pack('<h', v)
Wave_write.writeframes(tensor_enc)
Wave_write.close()
audio_string = fio.getvalue()
fio.close()
audio = Summary.Audio(sample_rate=sample_rate, num_channels=1, length_frames=len(tensor_list), encoded_audio_string=audio_string, content_type='audio/wav')
return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def __init__(self, data=None, *args, **kwargs):
if kwargs.get('metadata', False):
# internal use only
self._data = data
for attr, val in kwargs.pop('metadata').items():
setattr(self, attr, val)
else:
# normal construction
data = data if isinstance(data, basestring) else data.read()
raw = wave.open(StringIO(data), 'rb')
raw.rewind()
self.channels = raw.getnchannels()
self.sample_width = raw.getsampwidth()
self.frame_rate = raw.getframerate()
self.frame_width = self.channels * self.sample_width
raw.rewind()
self._data = raw.readframes(float('inf'))
super(AudioSegment, self).__init__(*args, **kwargs)
def play_raw(self, raw_data, rate=16000, channels=1, width=2, block=True):
self.raw = raw_data
self.width = width
self.channels = channels
self.event.clear()
self.stream = self.pa.open(format=self.pa.get_format_from_width(width),
channels=channels,
rate=rate,
output=True,
# output_device_index=1,
frames_per_buffer=CHUNK_SIZE,
stream_callback=self.raw_callback)
if block:
self.event.wait()
time.sleep(2) # wait for playing audio data in buffer, a alsa driver bug
self.stream.close()
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def _is_good_wave(self, filename):
"""
check if wav is in correct format for MARF.
"""
par = None
try:
w_file = wave.open(filename)
par = w_file.getparams()
w_file.close()
except wave.Error as exc:
print (exc)
return False
if par[:3] == (1, 2, 8000) and par[-1:] == ('not compressed',):
return True
else:
return False
def real_signal():
spf = wave.open('helloworld.wav', 'r')
#Extract Raw Audio from Wav File
# If you right-click on the file and go to "Get Info", you can see:
# sampling rate = 16000 Hz
# bits per sample = 16
# The first is quantization in time
# The second is quantization in amplitude
# We also do this for images!
# 2^16 = 65536 is how many different sound levels we have
signal = spf.readframes(-1)
signal = np.fromstring(signal, 'Int16')
T = len(signal)
signal = (signal - signal.mean()) / signal.std()
hmm = HMM(5, 3)
hmm.fit(signal.reshape(1, T, 1))
def real_signal():
spf = wave.open('helloworld.wav', 'r')
#Extract Raw Audio from Wav File
# If you right-click on the file and go to "Get Info", you can see:
# sampling rate = 16000 Hz
# bits per sample = 16
# The first is quantization in time
# The second is quantization in amplitude
# We also do this for images!
# 2^16 = 65536 is how many different sound levels we have
signal = spf.readframes(-1)
signal = np.fromstring(signal, 'Int16')
T = len(signal)
signal = (signal - signal.mean()) / signal.std()
hmm = HMM(5, 3)
# signal needs to be of shape N x T(n) x D
hmm.fit(signal.reshape(1, T, 1), learning_rate=10e-6, max_iter=20)
def real_signal():
spf = wave.open('helloworld.wav', 'r')
#Extract Raw Audio from Wav File
# If you right-click on the file and go to "Get Info", you can see:
# sampling rate = 16000 Hz
# bits per sample = 16
# The first is quantization in time
# The second is quantization in amplitude
# We also do this for images!
# 2^16 = 65536 is how many different sound levels we have
signal = spf.readframes(-1)
signal = np.fromstring(signal, 'Int16')
T = len(signal)
hmm = HMM(10)
hmm.fit(signal.reshape(1, T))
def real_signal():
spf = wave.open('helloworld.wav', 'r')
#Extract Raw Audio from Wav File
# If you right-click on the file and go to "Get Info", you can see:
# sampling rate = 16000 Hz
# bits per sample = 16
# The first is quantization in time
# The second is quantization in amplitude
# We also do this for images!
# 2^16 = 65536 is how many different sound levels we have
signal = spf.readframes(-1)
signal = np.fromstring(signal, 'Int16')
T = len(signal)
signal = (signal - signal.mean()) / signal.std()
hmm = HMM(5, 3)
hmm.fit(signal.reshape(1, T, 1))
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()
def use_cloud(token):
fp = wave.open('output.wav', 'rb')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "xxxxxxxxxx" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
#c.setopt(c.RETURNTRANSFER, 1)
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform() #pycurl.perform() has no return val
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()
def split(split_file_path, main_file_path, transcript_path, split_info):
'''
Here, splitting takes place.
Args:
split_file_path: File path for new split file.
main_file_path: File path for original .wav file.
transcript_path: File path where transcript will be written.
split_info: A tuple of the form (x, (y, z))
'''
audio_file = wave.open(main_file_path, 'rb')
split_file = wave.open(split_file_path, 'wb')
t0, t1 = split_info[1] # cut audio between t0, t1 seconds
s0, s1 = int(t0*audio_file.getframerate()), int(t1*audio_file.getframerate())
audio_file.readframes(s0) # discard frames up to s0
frames = audio_file.readframes(s1-s0)
split_file.setparams(audio_file.getparams())
split_file.writeframes(frames)
split_file.close()
# Store transcript
with open(transcript_path, 'wb') as f:
f.write(split_info[0])
# TODO: Get rid of multiple opening and closing of the same main audio file.
audio_file.close()
def create_csv(data_dir):
'''
Generates CSV file (as required by DeepSpeech_RHL.py) in the given dir.
Args:
data_dir: Directory where all .wav files and
their associated timescripts are stored.
'''
# Get all audio and transcript file paths.
audio_file_paths = sorted(glob.glob(data_dir + "*.wav"))
transcript_file_paths = sorted(glob.glob(data_dir + "*.txt"))
audio_file_sizes = []
transcripts = []
for x, y in zip(audio_file_paths, transcript_file_paths):
with open(y, "rb") as f:
transcripts.append(f.read())
# Get file size.
metadata = os.stat(x)
audio_file_sizes.append(metadata.st_size)
# Create pandas dataframe
df = pandas.DataFrame(columns=["wav_filename", "wav_filesize", "transcript"])
df["wav_filename"] = audio_file_paths
df["wav_filesize"] = audio_file_sizes
df["transcript"] = transcripts
df.to_csv(data_dir + "data.csv", sep=",", index=None) # Save CSV
def audio(tag, tensor, sample_rate=44100):
tensor = makenp(tensor)
tensor = tensor.squeeze()
assert (tensor.ndim == 1), 'input tensor should be 1 dimensional.'
tensor_list = [int(32767.0 * x) for x in tensor]
import io
import wave
import struct
fio = io.BytesIO()
Wave_write = wave.open(fio, 'wb')
Wave_write.setnchannels(1)
Wave_write.setsampwidth(2)
Wave_write.setframerate(sample_rate)
tensor_enc = b''
for v in tensor_list:
tensor_enc += struct.pack('<h', v)
Wave_write.writeframes(tensor_enc)
Wave_write.close()
audio_string = fio.getvalue()
fio.close()
audio = Summary.Audio(sample_rate=sample_rate, num_channels=1, length_frames=len(tensor_list),
encoded_audio_string=audio_string, content_type='audio/wav')
return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def load_sound(file_name):
fp = wave.open(file_name, 'rb')
try:
assert fp.getnchannels() == 1, '{0}: sound format is incorrect! Sound must be mono.'.format(file_name)
assert fp.getsampwidth() == 2, '{0}: sound format is incorrect! ' \
'Sample width of sound must be 2 bytes.'.format(file_name)
assert fp.getframerate() in (8000, 16000, 32000), '{0}: sound format is incorrect! ' \
'Sampling frequency must be 8000 Hz, 16000 Hz or 32000 Hz.'
sampling_frequency = fp.getframerate()
sound_data = fp.readframes(fp.getnframes())
finally:
fp.close()
del fp
return sound_data, sampling_frequency