def test_after_row_processing(self, refresh_batch, write_session, rw_conn):
with mock.patch.object(
refresh_batch,
'throttle_to_replication'
) as throttle_mock, mock.patch.object(
refresh_batch,
'_wait_for_throughput',
return_value=None
) as mock_wait:
# count can be anything since self.avg_throughput_cap is set to None
refresh_batch.unlock_tables(write_session)
refresh_batch.throttle_throughput(count=0)
assert write_session.rollback.call_count == 1
write_session.execute.assert_called_once_with('UNLOCK TABLES')
assert write_session.commit.call_count == 1
throttle_mock.assert_called_once_with(rw_conn)
assert mock_wait.call_count == 1
assert refresh_batch.avg_rows_per_second_cap == refresh_batch.DEFAULT_AVG_ROWS_PER_SECOND_CAP
copy_table_to_blackhole_table_test.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录