test_aggregationProcessor.py 文件源码

python
阅读 41 收藏 0 点赞 0 评论 0

项目:kafka-spark-influx-csv-analysis 作者: bwsw 项目源码 文件源码
def test_build_lambda_for_reduce(self):
        test_input_rule = "Min(packet_size);Max(sampling_rate); Sum(traffic)"
        input_data_structure = StructType([StructField("sampling_rate", LongType()),
                                           StructField("packet_size", LongType()),
                                           StructField("traffic", LongType())])
        test_input_operation = "reduce"
        config = TestConfig({"processing": {"aggregations": {"operation_type": test_input_operation,
                                                             "rule": test_input_rule}}})
        test_aggregation_processor = AggregationProcessor(config, input_data_structure)

        spark = SparkSession.builder.getOrCreate()
        sc = spark.sparkContext
        test_rdd = sc.parallelize([(4, 2, 1), (7, 1, 1), (1, 0, 1), (2, 5, 1), (1, 1, 1)])
        test_aggregation_lambda = test_aggregation_processor.get_aggregation_lambda()

        self.assertIsInstance(test_aggregation_lambda, types.LambdaType, "get_aggregation_lambda should return "
                                                                         "lambda function")

        test_result = test_aggregation_lambda(test_rdd)
        self.assertTupleEqual(test_result, (7, 0, 5), "Error in aggregation operation. Tuple should be equal")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号