def test_setup_connections(
self,
base_path,
refresh_batch,
cluster
):
with mock.patch(
base_path + '.TransactionManager'
) as mock_manager, mock.patch.object(
refresh_batch,
'get_connection_set_from_cluster'
) as mock_get_conn:
refresh_batch.setup_connections()
mock_manager.assert_called_once_with(
cluster_name=cluster,
ro_replica_name=cluster,
rw_replica_name=cluster,
connection_set_getter=mock_get_conn
)
copy_table_to_blackhole_table_test.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录