test_transformation_creator.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:kafka-spark-influx-csv-analysis 作者: bwsw 项目源码 文件源码
def test_build_lambda_with_nested_operations(self):
        mult_syntax_tree = SyntaxTree()
        mult_syntax_tree.operation = "mult"
        mult_syntax_tree.children = ["packet_size", "sampling_rate"]

        root_mult_st = SyntaxTree()
        root_mult_st.operation = "mult"
        root_mult_st.children = [mult_syntax_tree, "10"]

        parsed_transformations = ["src_ip", FieldTransformation("destination_ip", "dst_ip"),
                                  FieldTransformation("traffic", root_mult_st)]

        creator = TransformationCreator(self.data_structure, parsed_transformations, TransformationOperations({
            "country": "./GeoLite2-Country.mmdb",
            "city": "./GeoLite2-City.mmdb",
            "asn": "./GeoLite2-ASN.mmdb"
        }))

        transformation = creator.build_lambda()

        self.assertIsInstance(transformation, types.LambdaType, "Transformation type should be lambda")

        spark = SparkSession.builder.getOrCreate()
        file = spark.read.csv(DATA_PATH, self.data_structure_pyspark)

        result = file.rdd.map(transformation)

        result = result.collect()

        self.assertListEqual(result, [("217.69.143.60", "91.221.61.183", 378880),
                                      ("91.221.61.168", "90.188.114.141", 348160),
                                      ("91.226.13.80", "5.136.78.36", 7731200),
                                      ("192.168.30.2", "192.168.30.1", 947200),
                                      ("192.168.30.2", "192.168.30.1", 947200)],
                             "List of tuples should be equal")

        spark.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号