def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
python类tomono()的实例源码
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def _play(self, data, rate=16000, channels=1, width=2, spectrum=True):
stream = self.pyaudio_instance.open(
format=self.pyaudio_instance.get_format_from_width(width),
channels=channels,
rate=rate,
output=True,
# output_device_index=1,
frames_per_buffer=CHUNK_SIZE,
)
if isinstance(data, types.GeneratorType):
for d in data:
if self.stop_event.is_set():
break
stream.write(d)
if spectrum:
if channels == 2:
d = audioop.tomono(d, 2, 0.5, 0.5)
self.queue.put(d)
else:
stream.write(data)
stream.close()
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_string(self):
data = 'abcd'
size = 2
self.assertRaises(TypeError, audioop.getsample, data, size, 0)
self.assertRaises(TypeError, audioop.max, data, size)
self.assertRaises(TypeError, audioop.minmax, data, size)
self.assertRaises(TypeError, audioop.avg, data, size)
self.assertRaises(TypeError, audioop.rms, data, size)
self.assertRaises(TypeError, audioop.avgpp, data, size)
self.assertRaises(TypeError, audioop.maxpp, data, size)
self.assertRaises(TypeError, audioop.cross, data, size)
self.assertRaises(TypeError, audioop.mul, data, size, 1.0)
self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.add, data, data, size)
self.assertRaises(TypeError, audioop.bias, data, size, 0)
self.assertRaises(TypeError, audioop.reverse, data, size)
self.assertRaises(TypeError, audioop.lin2lin, data, size, size)
self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None)
self.assertRaises(TypeError, audioop.lin2ulaw, data, size)
self.assertRaises(TypeError, audioop.lin2alaw, data, size)
self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_issue7673(self):
state = None
for data, size in INVALID_DATA:
size2 = size
self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
self.assertRaises(audioop.error, audioop.max, data, size)
self.assertRaises(audioop.error, audioop.minmax, data, size)
self.assertRaises(audioop.error, audioop.avg, data, size)
self.assertRaises(audioop.error, audioop.rms, data, size)
self.assertRaises(audioop.error, audioop.avgpp, data, size)
self.assertRaises(audioop.error, audioop.maxpp, data, size)
self.assertRaises(audioop.error, audioop.cross, data, size)
self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(audioop.error, audioop.add, data, data, size)
self.assertRaises(audioop.error, audioop.bias, data, size, 0)
self.assertRaises(audioop.error, audioop.reverse, data, size)
self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_string(self):
data = 'abcd'
size = 2
self.assertRaises(TypeError, audioop.getsample, data, size, 0)
self.assertRaises(TypeError, audioop.max, data, size)
self.assertRaises(TypeError, audioop.minmax, data, size)
self.assertRaises(TypeError, audioop.avg, data, size)
self.assertRaises(TypeError, audioop.rms, data, size)
self.assertRaises(TypeError, audioop.avgpp, data, size)
self.assertRaises(TypeError, audioop.maxpp, data, size)
self.assertRaises(TypeError, audioop.cross, data, size)
self.assertRaises(TypeError, audioop.mul, data, size, 1.0)
self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5)
self.assertRaises(TypeError, audioop.add, data, data, size)
self.assertRaises(TypeError, audioop.bias, data, size, 0)
self.assertRaises(TypeError, audioop.reverse, data, size)
self.assertRaises(TypeError, audioop.lin2lin, data, size, size)
self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None)
self.assertRaises(TypeError, audioop.lin2ulaw, data, size)
self.assertRaises(TypeError, audioop.lin2alaw, data, size)
self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
def __db_level(self, rms_mode=False):
"""
Returns the average audio volume level measured in dB (range -60 db to 0 db)
If the sample is stereo, you get back a tuple: (left_level, right_level)
If the sample is mono, you still get a tuple but both values will be the same.
This method is probably only useful if processed on very short sample fragments in sequence,
so the db levels could be used to show a level meter for the duration of the sample.
"""
maxvalue = 2**(8*self.__samplewidth-1)
if self.nchannels == 1:
if rms_mode:
peak_left = peak_right = (audioop.rms(self.__frames, self.__samplewidth)+1)/maxvalue
else:
peak_left = peak_right = (audioop.max(self.__frames, self.__samplewidth)+1)/maxvalue
else:
left_frames = audioop.tomono(self.__frames, self.__samplewidth, 1, 0)
right_frames = audioop.tomono(self.__frames, self.__samplewidth, 0, 1)
if rms_mode:
peak_left = (audioop.rms(left_frames, self.__samplewidth)+1)/maxvalue
peak_right = (audioop.rms(right_frames, self.__samplewidth)+1)/maxvalue
else:
peak_left = (audioop.max(left_frames, self.__samplewidth)+1)/maxvalue
peak_right = (audioop.max(right_frames, self.__samplewidth)+1)/maxvalue
# cut off at the bottom at -60 instead of all the way down to -infinity
return max(20.0*math.log(peak_left, 10), -60.0), max(20.0*math.log(peak_right, 10), -60.0)
def read(self, size = -1):
buffer = self.audio_reader.readframes(self.audio_reader.getnframes() if size == -1 else size)
if not isinstance(buffer, bytes): buffer = b"" # workaround for https://bugs.python.org/issue24608
sample_width = self.audio_reader.getsampwidth()
if not self.little_endian: # big endian format, convert to little endian on the fly
if hasattr(audioop, "byteswap"): # ``audioop.byteswap`` was only added in Python 3.4 (incidentally, that also means that we don't need to worry about 24-bit audio being unsupported, since Python 3.4+ always has that functionality)
buffer = audioop.byteswap(buffer, sample_width)
else: # manually reverse the bytes of each sample, which is slower but works well enough as a fallback
buffer = buffer[sample_width - 1::-1] + b"".join(buffer[i + sample_width:i:-1] for i in range(sample_width - 1, len(buffer), sample_width))
# workaround for https://bugs.python.org/issue12866
if self.samples_24_bit_pretending_to_be_32_bit: # we need to convert samples from 24-bit to 32-bit before we can process them with ``audioop`` functions
buffer = b"".join("\x00" + buffer[i:i + sample_width] for i in range(0, len(buffer), sample_width)) # since we're in little endian, we prepend a zero byte to each 24-bit sample to get a 32-bit sample
if self.audio_reader.getnchannels() != 1: # stereo audio
buffer = audioop.tomono(buffer, sample_width, 1, 1) # convert stereo audio data to mono
return buffer
def test_tomono(self):
data2 = bytearray()
for d in data[0]:
data2.append(d)
data2.append(d)
self.assertEqual(audioop.tomono(data2, 1, 0.5, 0.5), data[0])
def get_pipe_channel(self, data, channel):
""" Retrieve data for particular channel """
ch = audioop.tomono(data, 2, channel[0], channel[1])
ch_max = audioop.max(ch, 2)
return int(self.max * (ch_max / self.k))
def test_tomono(self):
for w in 1, 2, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 1, 0), data1)
self.assertEqual(audioop.tomono(str(data2), w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 0.5, 0.5), data1)
def test_tomono(self):
for w in 1, 2, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 1, 0), data1)
self.assertEqual(audioop.tomono(str(data2), w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 0.5, 0.5), data1)
def _convert_file(self, src, dest=None):
"""
convert wav into 8khz rate
"""
def convert(read,write):
write.setparams((1, 2, 8000, 0,'NONE', 'not compressed'))
o_fr = read.getframerate()
o_chnl = read.getnchannels()
t_fr = read.getnframes()
data = read.readframes(t_fr)
cnvrt = audioop.ratecv(data, 2, o_chnl,
o_fr, 8000, None)
if o_chnl != 1:
mono = audioop.tomono(cnvrt[0], 2, 1, 0)
write.writeframes(mono)
else:
write.writeframes(cnvrt[0])
read.close()
write.close()
if dest is None:
temp = src + '.temp'
os.rename(src, temp)
read = wave.open(temp, 'r')
write = wave.open(src, 'w')
convert(read, write)
os.remove(temp)
else:
read = wave.open(src, 'r')
write = wave.open(dest, 'w')
convert(read, write)
def test_tomono(self):
for w in 1, 2, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 1, 0), data1)
self.assertEqual(audioop.tomono(data2, w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 0.5, 0.5), data1)
def _next_chunk(self):
frames = self._wave_fp.readframes(self._nframes)
if self._n_channels == 2:
frames = audioop.tomono(frames, self._sampwidth, .5, .5)
if len(frames) == 0:
raise StopAsyncIteration('No more frames in wav')
chunk = AudioChunk(0, audio=frames, width=self._sampwidth,
freq=self._samprate)
return chunk
def setStereoWave(self, stream):
sampwidth = stream.getsampwidth()
left_delta = 2**(8 * sampwidth)
right_delta = left_delta * 2
frames = stream.getnframes()
ratio = frames / 255
data = stream.readframes(float('inf'))
left_data = audioop.tomono(data, sampwidth, 1, 0)
right_data = audioop.tomono(data, sampwidth, 0, 1)
wavepath = QtGui.QPainterPath()
try:
for frame_set in xrange(256):
left_min = left_max = right_min = right_max = 0
for frame in xrange(ratio):
try:
pos = frame + frame_set * ratio
left_value = audioop.getsample(left_data, sampwidth, pos)
left_min = min(left_min, left_value)
left_max = max(left_max, left_value)
right_value = audioop.getsample(right_data, sampwidth, pos)
right_min = min(right_min, right_value)
right_max = max(right_max, right_value)
except:
break
wavepath.moveTo(frame_set, left_delta - left_min)
wavepath.lineTo(frame_set, left_delta - left_max)
wavepath.moveTo(frame_set, right_delta - right_min)
wavepath.lineTo(frame_set, right_delta - right_max)
# left_wavepath.lineTo(frame, left_sampwidth_int - left_value)
# right_wavepath.lineTo(frame, right_sampwidth_int - right_value)
except:
pass
# left_wavepath.addPath(right_wavepath)
self.wavepath = self.scene.addPath(wavepath)
self.wavepath.setPen(self.pen)
self.fitInView(0, 0, 256, right_delta)
self.centerOn(self.wavepath)
self.setBackgroundBrush(QtCore.Qt.white)
def test_tomono(self):
for w in 1, 2, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 1, 0), data1)
self.assertEqual(audioop.tomono(str(data2), w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 0.5, 0.5), data1)
def send_voice(self, wav, target, terminated):
packet = bytearray()
packet.append(self.outgoing_type.value << 5 | target.value)
packet.extend(self._encode_varint(self.outgoing_sequence_number))
nchannels, sampwidth, framerate, n, _, _ = wav.getparams()
logger.debug('Sending audio: %d channels, %d-bit, %dhz, %d frames',
nchannels, sampwidth * 8, framerate, n)
for i in range(n):
pcm = wav.readframes(1)
if sampwidth != 2:
pcm = audioop.lin2lin(pcm, sampwidth, 2)
if nchannels == 2:
pcm = audioop.tomono(pcm, 2, 0.5, 0.5)
if framerate != 48000:
pcm, _ = audioop.ratecv(pcm, 2, 1, framerate, 48000, None)
frame = self.outgoing_codec.encoder.encode(pcm)
if self.outgoing_type == self.PacketType.VOICE_OPUS:
#packet.extend(self._make_opus_header(frame))
# TODO: figure out opus
pass
else:
packet.extend(self._make_celt_header(frame, True))
if i == n - 1 and terminated:
packet.extend(self._make_celt_header(b'', False))
print(packet)
self.transport.sendto(bytes(packet))
self.outgoing_sequence_number += 1
def test_tomono(self):
for w in 1, 2, 3, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 1, 0), data1)
self.assertEqual(audioop.tomono(data2, w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 0.5, 0.5), data1)
self.assertEqual(audioop.tomono(bytearray(data2), w, 0.5, 0.5),
data1)
self.assertEqual(audioop.tomono(memoryview(data2), w, 0.5, 0.5),
data1)
def test_tomono(self):
for w in 1, 2, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 1, 0), data1)
self.assertEqual(audioop.tomono(str(data2), w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(str(data2), w, 0.5, 0.5), data1)
def test_tomono(self):
for w in 1, 2, 3, 4:
data1 = datas[w]
data2 = bytearray(2 * len(data1))
for k in range(w):
data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 1, 0), data1)
self.assertEqual(audioop.tomono(data2, w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 0.5, 0.5), data1)
self.assertEqual(audioop.tomono(bytearray(data2), w, 0.5, 0.5),
data1)
self.assertEqual(audioop.tomono(memoryview(data2), w, 0.5, 0.5),
data1)
def mono(self, left_factor=1.0, right_factor=1.0):
"""Make the sample mono (1-channel) applying the given left/right channel factors when downmixing"""
assert not self.__locked
if self.__nchannels == 1:
return self
if self.__nchannels == 2:
self.__frames = audioop.tomono(self.__frames, self.__samplewidth, left_factor, right_factor)
self.__nchannels = 1
return self
raise ValueError("sample must be stereo or mono already")
def read(self, size = -1):
buffer = self.wav_reader.readframes(self.wav_reader.getnframes() if size == -1 else size)
if isinstance(buffer, str) and str is not bytes: buffer = b"" # workaround for https://bugs.python.org/issue24608, unfortunately only fixes the issue for little-endian systems
if self.wav_reader.getnchannels() != 1: # stereo audio
buffer = audioop.tomono(buffer, self.wav_reader.getsampwidth(), 1, 1) # convert stereo audio data to mono
return buffer
def draw_wave(self, stream, force=False):
# print stream.getnframes()
if self.wavepath:
self.scene().removeItem(self.wavepath)
self.fitInView(0, 0, 1, 1)
self.current_sampwidth = sampwidth = stream.getsampwidth()
self.current_sampwidth_int = delta = 2**(8*sampwidth)
if stream in self.cache and not force:
self.current_data, wavepath = self.cache[stream]
else:
stream.rewind()
frames = stream.getnframes()
ratio = frames / 64
if stream.getnchannels() == 2:
data = audioop.tomono(stream.readframes(float('inf')), sampwidth, self.main.left_spin.value(), self.main.right_spin.value())
else:
data = stream.readframes(float('inf'))
data = audioop.mul(data, sampwidth, self.main.gain)
self.current_data = data
wavepath = QtGui.QPainterPath()
try:
for frame_set in xrange(ratio):
frame_min = frame_max = 0
for frame in xrange(64):
try:
value = audioop.getsample(data, sampwidth, frame + frame_set * 64)
frame_min = min(frame_min, value)
frame_max = max(frame_max, value)
except:
break
if frame == 0:
break
wavepath.moveTo(frame_set, delta - frame_min)
wavepath.lineTo(frame_set, delta - frame_max)
except:
pass
self.cache[stream] = data, wavepath
self.wavepath = self.scene().addPath(wavepath)
self.wavepath.setPen(self.wave_pen)
self.wavepath.setY(-delta * .5)
self.wavepath.setX(self.left_margin*2)
self.fitInView(0, 0, self.zoom_values[self.zoom], delta)
if not force:
self.centerOn(self.wavepath)
self.right_margin_item.setX(len(self.current_data)/self.current_sampwidth/64)
visible = self.mapToScene(self.viewport().rect()).boundingRect()
if visible.width() > self.wavepath.boundingRect().width():
self.scene().setSceneRect(-self.left_margin, 0, visible.width(), delta)
else:
self.scene().setSceneRect(-self.left_margin, 0, self.wavepath.boundingRect().width(), delta)