def detect_squelch_level(self, detect_time=10, threshold=.8):
start_time = time.time()
end_time = start_time + detect_time
audio_chunks = collections.deque()
async with self._source.listen():
async for block in self._source:
if time.time() > end_time:
break
even_iter = EvenChunkIterator(block, self._sample_size)
try:
while time.time() < end_time:
audio_chunks.append(await even_iter.__anext__())
except StopAsyncIteration:
pass
rms_vals = [audioop.rms(x.audio, self._sample_width) for x in
audio_chunks
if len(x.audio) == self._sample_size * self._sample_width]
level = sorted(rms_vals)[int(threshold * len(rms_vals)):][0]
self.squelch_level = level
return level
评论列表
文章目录