def test_continuous_send_dialog(self):
self.add_signal_to_form("esaver.complex")
self.__add_first_signal_to_generator()
port = self.__get_free_port()
gframe = self.form.generator_tab_controller
expected = np.zeros(gframe.total_modulated_samples, dtype=np.complex64)
expected = gframe.modulate_data(expected)
current_index = Value("L", 0)
buffer = Array("f", 4 * len(expected))
process = Process(target=receive, args=(port, current_index, 2*len(expected), buffer))
process.daemon = True
process.start()
time.sleep(0.1) # ensure server is up
ContinuousModulator.BUFFER_SIZE_MB = 10
continuous_send_dialog = self.__get_continuous_send_dialog()
continuous_send_dialog.device.set_client_port(port)
continuous_send_dialog.ui.spinBoxNRepeat.setValue(2)
continuous_send_dialog.ui.btnStart.click()
QTest.qWait(100)
time.sleep(1)
process.join(1)
# CI sometimes swallows a sample
self.assertGreaterEqual(current_index.value, len(expected) - 1)
buffer = np.frombuffer(buffer.get_obj(), dtype=np.complex64)
for i in range(len(expected)):
self.assertEqual(buffer[i], expected[i], msg=str(i))
continuous_send_dialog.ui.btnStop.click()
continuous_send_dialog.ui.btnClear.click()
QTest.qWait(1)
self.__close_dialog(continuous_send_dialog)
评论列表
文章目录