test_analysis.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:kafka-spark-influx-csv-analysis 作者: bwsw 项目源码 文件源码
def test_get_analysis_lambda_for_reduce(self, mock_analysis_record):
        # set up input data structure obtained after transformation and aggregation
        input_data_structure = {'rule': [{'key': False, 'func_name': 'Max', 'input_field': 'traffic'},
                                         {'key': False, 'func_name': 'Max', 'input_field': 'ip_size'},
                                         {'key': False, 'func_name': 'Sum', 'input_field': 'ip_size_sum'}],
                                'operation_type': 'reduceByKey'}
        # set up structure of config
        config = TestConfig(
            {
                "historical": {
                    "method": "influx",
                    "influx_options": {
                        "measurement": "mock"
                    }
                },
                "alert": {
                    "method": "stdout",
                    "option": {}
                },
                "time_delta": 20,
                "accuracy": 3,
                "rule": {
                    "ip_size": 5,
                    "ip_size_sum": 10,
                    "traffic": 15
                }

            })

        detection = Analysis(config.content, Mock(), Mock(),
                             input_data_structure)
        lambda_analysis = detection.get_analysis_lambda()
        self.assertIsInstance(lambda_analysis, types.LambdaType,
                              "Failed. get_analysis_lambda should return a lambda object")

        lambda_analysis((3, 4, 5, 4))
        self.assertTrue(mock_analysis_record.called,
                        "Failed. The analysis_record didn't call in lambda that returned by get_analysis_lambda.")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号