def record(self, chunksize, *args):
with self.open_stream(*args, chunksize=chunksize,
output=False) as stream:
while True:
try:
frame = stream.read(chunksize)
except IOError as e:
if type(e.errno) is not int:
# Simple hack to work around the fact that the
# errno/strerror arguments were swapped in older
# PyAudio versions. This was fixed in upstream
# commit 1783aaf9bcc6f8bffc478cb5120ccb6f5091b3fb.
strerror, errno = e.errno, e.strerror
else:
strerror, errno = e.strerror, e.errno
self._logger.warning("IO error while reading from device" +
" '%s': '%s' (Errno: %d)", self.slug,
strerror, errno)
else:
yield frame
python类PyAudio()的实例源码
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def pya():
global global_pya
import pyaudio
if global_pya == None:
# suppress Jack and ALSA error messages on Linux.
nullfd = os.open("/dev/null", 1)
oerr = os.dup(2)
os.dup2(nullfd, 2)
global_pya = pyaudio.PyAudio()
os.dup2(oerr, 2)
os.close(oerr)
os.close(nullfd)
return global_pya
# find the lowest supported input rate >= rate.
# needed on Linux but not the Mac (which converts as needed).
def __init__(self, wave_prefix, total_seconds=-1, partial_seconds=None, incremental_mode = True, stop_callback=None, save_callback=None):
self.chunk = 1600
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 16000
self.partial_seconds = partial_seconds
self.record_seconds = total_seconds
self.wave_prefix = wave_prefix
self.data = None
self.stream = None
self.p = pyaudio.PyAudio()
if partial_seconds != None and partial_seconds > 0:
self.multiple_waves_mode = True
else: self.multiple_waves_mode = False
self.incremental_mode = incremental_mode
self._stop_signal = True
self.stop_callback = stop_callback
self.save_callback = save_callback
def Destroy():
Frames = 1024
Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect"
"\\BattleshipsSounds\\Sounds\\Destroy.wav", 'rb')
Repro = pyaudio.PyAudio()
stream = Repro.open(
format=Repro.get_format_from_width(Sound.getsampwidth()),
channels=Sound.getnchannels(),
rate=Sound.getframerate(),
output=True)
data = Sound.readframes(
Frames)
while data != b'':
stream.write(data)
data = Sound.readframes(Frames)
stream.close()
Repro.terminate()
def Lanz():
Frames = 1024
Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect"
"\\BattleshipsSounds\\Sounds\\Lanzamiento.wav", 'rb')
Repro = pyaudio.PyAudio()
stream = Repro.open(
format=Repro.get_format_from_width(Sound.getsampwidth()),
channels=Sound.getnchannels(),
rate=Sound.getframerate(),
output=True)
data = Sound.readframes(
Frames)
while data != b'':
stream.write(data)
data = Sound.readframes(Frames)
stream.close()
Repro.terminate()
def Water():
Frames = 1024
Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect"
"\\BattleshipsSounds\\Sounds\\Water.wav", 'rb')
Repro = pyaudio.PyAudio()
stream = Repro.open(
format=Repro.get_format_from_width(Sound.getsampwidth()),
channels=Sound.getnchannels(),
rate=Sound.getframerate(),
output=True)
data = Sound.readframes(
Frames)
while data != b'':
stream.write(data)
data = Sound.readframes(Frames)
stream.close()
Repro.terminate()
def Looser():
Frames = 1024
Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect"
"\\BattleshipsSounds\\Sounds\\Looser.wav", 'rb')
Repro = pyaudio.PyAudio()
stream = Repro.open(
format=Repro.get_format_from_width(Sound.getsampwidth()),
channels=Sound.getnchannels(),
rate=Sound.getframerate(),
output=True)
data = Sound.readframes(
Frames)
while data != b'':
stream.write(data)
data = Sound.readframes(Frames)
stream.close()
Repro.terminate()
def Boton():
Frames = 1024
Sound = wave.open(
"C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect"
"\\BattleshipsSounds\\Sounds\\Botton.wav", 'rb')
Repro = pyaudio.PyAudio()
stream = Repro.open(
format=Repro.get_format_from_width(Sound.getsampwidth()),
channels=Sound.getnchannels(),
rate=Sound.getframerate(),
output=True)
data = Sound.readframes(
Frames)
while data != b'':
stream.write(data)
data = Sound.readframes(Frames)
stream.close()
Repro.terminate()
def __init__(self, device_index = None, sample_rate = None, chunk_size = None):
assert device_index is None or isinstance(device_index, int), "Device index must be None or an integer"
if device_index is not None: # ensure device index is in range
audio = pyaudio.PyAudio(); count = audio.get_device_count(); audio.terminate() # obtain device count
assert 0 <= device_index < count, "Device index out of range"
assert sample_rate is None or isinstance(sample_rate, int) and sample_rate > 0, "Sample rate must be None or a positive integer"
assert chunk_size is None or isinstance(chunk_size, int) and chunk_size > 0, "Chunk size must be None or a positive integer"
if sample_rate is None: chunk_size = 16000
if chunk_size is None: chunk_size = 1024
self.device_index = device_index
self.format = pyaudio.paInt16 # 16-bit int sampling
self.SAMPLE_WIDTH = pyaudio.get_sample_size(self.format) # size of each sample
self.SAMPLE_RATE = sample_rate # sampling rate in Hertz
self.CHUNK = chunk_size # number of frames stored in each buffer
self.audio = None
self.stream = None
def _play_with_pyaudio(seg):
import pyaudio
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(seg.sample_width),
channels=seg.channels,
rate=seg.frame_rate,
output=True)
# break audio into half-second chunks (to allows keyboard interrupts)
for chunk in make_chunks(seg, 500):
stream.write(chunk._data)
stream.stop_stream()
stream.close()
p.terminate()
def __init__(self):
self.miso = Queue()
self.mosi = Queue()
print "starting worker thread"
CHUNK = 1024
FORMAT = pyaudio.paInt16
#paUint16 #paInt8
CHANNELS = 1
RATE = 44100 #sample rate
self.paw = pyaudio.PyAudio()
self.stream = self.paw.open(format=FORMAT,
channels=CHANNELS,
#input_device_index = 4, # rocketfish
rate=RATE,
input=True,
stream_callback=self.callback,
frames_per_buffer=CHUNK) #buffer
#self.fort_proc = Process(target = fft_worker,
# args=(self.mosi, self.miso))
#self.fort_proc.start()
atexit.register(self.shutdown)
print "allegedly started worker"
def __init__(self, name, chunks, chunksize=1024, channels=2, rate="auto"):
self.elements_per_ringbuffer = chunks
self.frames_per_element = chunksize
self.samples_per_frame = channels
self.bytes_per_sample = 2 # int16
self.name = name
self.channels = channels
self._ringbuffer = collections.deque(
[b"0"*self.bytes_per_sample*channels*chunksize]*chunks,
maxlen=self.elements_per_ringbuffer)
self.p = pyaudio.PyAudio() # pylint: disable=invalid-name
default_output = self.p.get_default_output_device_info()
self.deviceindex = default_output['index']
if rate == "auto":
self.rate = default_output['defaultSampleRate']
else:
self.rate = rate
self.has_new_audio = False
def _record():
global p
global stream
global frames
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print("????...")
frames = []
signal.signal(signal.SIGINT, stop_record)
while (True):
if stream.is_stopped():
stream.close()
p.terminate()
break
data = stream.read(CHUNK)
frames.append(data)
def __init__(self,device=None,rate=None,chunk=4096,maxMemorySec=5):
"""
Prime the Ear class to access audio data. Recording won't start
until stream_start() is called.
- if device is none, will use first valid input (not system default)
- if a rate isn't specified, the lowest usable rate will be used.
"""
# configuration
self.chunk = chunk # doesn't have to be a power of 2
self.maxMemorySec = maxMemorySec # delete if more than this around
self.device=device
self.rate=rate
# internal variables
self.chunksRecorded=0
self.p=pyaudio.PyAudio() #keep this forever
self.t=False #later will become threads
### SOUND CARD TESTING
def valid_input_devices(self):
"""
See which devices can be opened for microphone input.
call this when no PyAudio object is loaded.
"""
mics=[]
for device in range(self.p.get_device_count()):
if self.valid_test(device):
mics.append(device)
if len(mics)==0:
print("no microphone devices found!")
else:
print("found %d microphone devices: %s"%(len(mics),mics))
return mics
### SETUP AND SHUTDOWN
def play_audio_file(fname=None):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def play_audio_file_and_record(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
print("start of ding")
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
print("end of ding, recording")
print("end of recording")
recognize_speech()
def play(self, wave):
self.playing = True
aud = pyaudio.PyAudio()
stream = aud.open(format = aud.get_format_from_width(wave.getsampwidth()), channels = wave.getnchannels(), rate = wave.getframerate(), output = True)
while self.playing:
data = wave.readframes(1024)
if not data:
break
stream.write(data)
stream.stop_stream()
stream.close()
aud.terminate()
def __init__(self, rt):
super().__init__(rt)
config = rt.config['frontends']['speech']['recognizers']
self.listener_config = config
self.chunk_size = config['chunk_size']
self.format = pyaudio.paInt16
self.sample_width = pyaudio.get_sample_size(self.format)
self.sample_rate = config['sample_rate']
self.channels = config['channels']
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=self.format, channels=self.channels,
rate=self.sample_rate, input=True,
frames_per_buffer=self.chunk_size)
self.buffer_sec = config['wake_word_length']
self.talking_volume_ratio = config['talking_volume_ratio']
self.required_integral = config['required_noise_integral']
self.max_di_dt = config['max_di_dt']
self.noise_max_out_sec = config['noise_max_out_sec']
self.sec_between_ww_checks = config['sec_between_ww_checks']
self.recording_timeout = config['recording_timeout']
self.energy_weight = 1.0 - pow(1.0 - config['ambient_adjust_speed'],
self.chunk_size / self.sample_rate)
# For convenience
self.chunk_sec = self.chunk_size / self.sample_rate
self.av_energy = None
self.integral = 0
self.noise_level = 0
self._intercept = None