def __init__(self, device_index = None, sample_rate = None, chunk_size = None):
assert device_index is None or isinstance(device_index, int), "Device index must be None or an integer"
if device_index is not None: # ensure device index is in range
audio = pyaudio.PyAudio(); count = audio.get_device_count(); audio.terminate() # obtain device count
assert 0 <= device_index < count, "Device index out of range"
assert sample_rate is None or isinstance(sample_rate, int) and sample_rate > 0, "Sample rate must be None or a positive integer"
assert chunk_size is None or isinstance(chunk_size, int) and chunk_size > 0, "Chunk size must be None or a positive integer"
if sample_rate is None: chunk_size = 16000
if chunk_size is None: chunk_size = 1024
self.device_index = device_index
self.format = pyaudio.paInt16 # 16-bit int sampling
self.SAMPLE_WIDTH = pyaudio.get_sample_size(self.format) # size of each sample
self.SAMPLE_RATE = sample_rate # sampling rate in Hertz
self.CHUNK = chunk_size # number of frames stored in each buffer
self.audio = None
self.stream = None
python类get_sample_size()的实例源码
def __init__(self, rt):
super().__init__(rt)
config = rt.config['frontends']['speech']['recognizers']
self.listener_config = config
self.chunk_size = config['chunk_size']
self.format = pyaudio.paInt16
self.sample_width = pyaudio.get_sample_size(self.format)
self.sample_rate = config['sample_rate']
self.channels = config['channels']
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=self.format, channels=self.channels,
rate=self.sample_rate, input=True,
frames_per_buffer=self.chunk_size)
self.buffer_sec = config['wake_word_length']
self.talking_volume_ratio = config['talking_volume_ratio']
self.required_integral = config['required_noise_integral']
self.max_di_dt = config['max_di_dt']
self.noise_max_out_sec = config['noise_max_out_sec']
self.sec_between_ww_checks = config['sec_between_ww_checks']
self.recording_timeout = config['recording_timeout']
self.energy_weight = 1.0 - pow(1.0 - config['ambient_adjust_speed'],
self.chunk_size / self.sample_rate)
# For convenience
self.chunk_sec = self.chunk_size / self.sample_rate
self.av_energy = None
self.integral = 0
self.noise_level = 0
self._intercept = None
def active_listen_to_all_options(self, THRESHOLD=None, LISTEN=True,
MUSIC=False):
"""
Records until a second of silence or times out after 12 seconds
Returns a list of the matching options or None
"""
RATE = 16000
CHUNK = 1024
LISTEN_TIME = 12
# check if no threshold provided
if THRESHOLD is None:
THRESHOLD = self.fetch_threshold()
self.speaker.play(jessypath.data('audio', 'beep_hi.wav'))
# prepare recording stream
stream = self._audio.open(format=pyaudio.paInt16,
channels=1,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
frames = []
# increasing the range # results in longer pause after command
# generation
lastN = [THRESHOLD * 1.2 for i in range(30)]
for i in range(0, RATE / CHUNK * LISTEN_TIME):
data = stream.read(CHUNK)
frames.append(data)
score = self.get_score(data)
lastN.pop(0)
lastN.append(score)
average = sum(lastN) / float(len(lastN))
# TODO: 0.8 should not be a MAGIC NUMBER!
if average < THRESHOLD * 0.8:
break
self.speaker.play(jessypath.data('audio', 'beep_lo.wav'))
# save the audio data
stream.stop_stream()
stream.close()
with tempfile.SpooledTemporaryFile(mode='w+b') as f:
wav_fp = wave.open(f, 'wb')
wav_fp.setnchannels(1)
wav_fp.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wav_fp.setframerate(RATE)
wav_fp.writeframes(''.join(frames))
wav_fp.close()
f.seek(0)
return self.active_stt_engine.transcribe(f)