def _data_format(self, x):
"""The data types in numpy needs to be mapped to the equivalent type in
portaudio. This is an issue for 24 bit audio files since there isn't a
24 bit data type in numpy. This is currently not implemented. There are
some options on how to do this. We could for example use a 32 bit int and
store the 24 bits either so that bits 1 to 8 is set to zeroes or so that
bits 25 to 32 is set to zeros.
"""
retval = None
if x.samples.dtype == np.dtype(np.float32):
self._logger.debug("pyaudio.paFloat32")
retval = pyaudio.paFloat32
elif x.samples.dtype == np.dtype(np.int16):
self._logger.debug("pyaudio.paInt16")
retval = pyaudio.paInt16
elif x.samples.dtype == np.dtype(np.int32):
self._logger.debug("pyaudio.paInt32")
retval = pyaudio.paInt32
else:
raise NotImplementedError("Data type not understood: %s" %x.samples.dtype)
return retval
python类paInt32()的实例源码
def record(record_len): # in second
if p is None or idx is None:
init()
# Constant area. These are put inside because this function will not be in main thread.
channels = 6
rate = 48000
chunk = 1200
# Bit depth and max value goes together.
record_format, numpy_format = pyaudio.paInt32, numpy.int32
stream = p.open(format=record_format,
channels=channels,
rate=rate,
input=True,
input_device_index=idx,
frames_per_buffer=chunk)
print('Begin recording...')
wave_data = numpy.empty((channels, 0), dtype=numpy_format)
for i in range(0, int(rate / chunk * record_len)):
data = stream.read(chunk, exception_on_overflow=False)
np_tmp = numpy.frombuffer(data, dtype=numpy_format)
np_tmp = numpy.reshape(np_tmp, (chunk, channels))
np_tmp = np_tmp.transpose()
wave_data = numpy.append(wave_data, np_tmp, axis=1)
stream.stop_stream()
stream.close()
return rate, wave_data, pos
def _recreate_outputter(self):
if self.samp_queue:
self.play(None)
self.samp_queue = queue.Queue(maxsize=self.queue_size)
stream_ready = threading.Event()
def audio_thread():
audio = pyaudio.PyAudio()
try:
audio_format = audio.get_format_from_width(self.samplewidth) if self.samplewidth != 4 else pyaudio.paInt32
self.stream = audio.open(format=audio_format, channels=self.nchannels, rate=self.samplerate, output=True)
stream_ready.set()
q = self.samp_queue
try:
while True:
sample = q.get()
if not sample:
break
sample.write_frames(self.stream)
if self.played_callback:
self.played_callback(sample)
if q.empty():
time.sleep(sample.duration)
self.all_played.set()
finally:
self.stream.close()
self.all_played.set()
finally:
audio.terminate()
outputter = threading.Thread(target=audio_thread, name="audio-pyaudio", daemon=True)
outputter.start()
stream_ready.wait()
def _apiai_stt(self):
from math import log
import audioop
import pyaudio
import time
resampler = apiai.Resampler(source_samplerate=settings['RATE'])
request = self.ai.voice_request()
vad = apiai.VAD()
def callback(in_data, frame_count):
frames, data = resampler.resample(in_data, frame_count)
if settings.show_decibels:
decibel = 20 * log(audioop.rms(data, 2) + 1, 10)
click.echo(decibel)
state = vad.processFrame(frames)
request.send(data)
state_signal = pyaudio.paContinue if state == 1 else pyaudio.paComplete
return in_data, state_signal
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt32, input=True, output=False, stream_callback=callback,
channels=settings['CHANNELS'], rate=settings['RATE'], frames_per_buffer=settings['CHUNK'])
stream.start_stream()
click.echo("Speak!")
while stream.is_active():
time.sleep(0.1)
stream.stop_stream()
stream.close()
p.terminate()