test_transformation_creator.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:kafka-spark-influx-csv-analysis 作者: bwsw 项目源码 文件源码
def test_build_lambda(self):
        mult_syntax_tree = SyntaxTree()
        mult_syntax_tree.operation = "mult"
        mult_syntax_tree.children = ["packet_size", "sampling_rate"]

        parsed_transformations = ["src_ip", FieldTransformation("destination_ip", "dst_ip"),
                                  FieldTransformation("traffic", mult_syntax_tree)]

        creator = TransformationCreator(self.data_structure, parsed_transformations, TransformationOperations({
            "country": "./GeoLite2-Country.mmdb",
            "city": "./GeoLite2-City.mmdb",
            "asn": "./GeoLite2-ASN.mmdb"
        }))

        transformation = creator.build_lambda()

        self.assertIsInstance(transformation, types.LambdaType, "Transformation type should be lambda")

        spark = SparkSession.builder.getOrCreate()
        file = spark.read.csv(DATA_PATH, self.data_structure_pyspark)

        result = file.rdd.map(transformation)

        result = result.collect()

        self.assertListEqual(result, [("217.69.143.60", "91.221.61.183", 37888),
                                      ("91.221.61.168", "90.188.114.141", 34816),
                                      ("91.226.13.80", "5.136.78.36", 773120),
                                      ("192.168.30.2", "192.168.30.1", 94720),
                                      ("192.168.30.2", "192.168.30.1", 94720)], "List of tuples should be equal")

        spark.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号