def test_get_analysis_lambda_and_analysis_lambda_reduceByKey(self, mock_data_delivery, mock_alert_factory,
mock_historical_data, mock_rdd):
input_data_structure = {'rule': [{'key': True, 'func_name': '', 'input_field': 'ip'},
{'key': False, 'func_name': 'Max', 'input_field': 'ip_size'},
{'key': False, 'func_name': 'Sum', 'input_field': 'ip_size_sum'}],
'operation_type': 'reduceByKey'}
enumerate_output_aggregation_field = {"ip_size": 1, "ip_size_sum": 2}
def mock_foreachPartition(test_lambda):
test_data = [(1, 2, 3)]
return list(test_lambda(test_data))
mock_rdd.foreachPartition.side_effect = mock_foreachPartition
mock_class = MagicMock()
mock_analysis = MagicMock()
mock_class.MockAnalysis.return_value = mock_analysis
mock_analysis.analysis = MagicMock()
sys.modules['analysis.MockAnalysis'] = mock_class
mock_history_data_singleton = MagicMock()
mock_data_delivery.return_value = mock_history_data_singleton
obj_mock_alert_factory = MagicMock()
mock_alert_factory.return_value = obj_mock_alert_factory
obj_mock_historical_data = MagicMock()
mock_historical_data.return_value = obj_mock_historical_data
analysis_factory = AnalysisFactory(self._config, input_data_structure, enumerate_output_aggregation_field)
test_lambda = analysis_factory.get_analysis_lambda()
self.assertIsInstance(test_lambda, types.LambdaType, "get_analysis_lambda should return lambda function")
test_lambda(mock_rdd)
self.assertTrue(mock_rdd.foreachPartition.called,
"Failed. The foreachPartition didn't call in lambda that returned by get_analysis_lambda.")
mock_analysis.analysis.assert_called_with(obj_mock_historical_data, obj_mock_alert_factory)
test_analysisFactory.py 文件源码
python
阅读 21
收藏 0
点赞 0
评论 0
评论列表
文章目录