def test_build_lambda(self):
mult_syntax_tree = SyntaxTree()
mult_syntax_tree.operation = "mult"
mult_syntax_tree.children = ["packet_size", "sampling_rate"]
parsed_transformations = ["src_ip", FieldTransformation("destination_ip", "dst_ip"),
FieldTransformation("traffic", mult_syntax_tree)]
creator = TransformationCreator(self.data_structure, parsed_transformations, TransformationOperations({
"country": "./GeoLite2-Country.mmdb",
"city": "./GeoLite2-City.mmdb",
"asn": "./GeoLite2-ASN.mmdb"
}))
transformation = creator.build_lambda()
self.assertIsInstance(transformation, types.LambdaType, "Transformation type should be lambda")
spark = SparkSession.builder.getOrCreate()
file = spark.read.csv(DATA_PATH, self.data_structure_pyspark)
result = file.rdd.map(transformation)
result = result.collect()
self.assertListEqual(result, [("217.69.143.60", "91.221.61.183", 37888),
("91.221.61.168", "90.188.114.141", 34816),
("91.226.13.80", "5.136.78.36", 773120),
("192.168.30.2", "192.168.30.1", 94720),
("192.168.30.2", "192.168.30.1", 94720)], "List of tuples should be equal")
spark.stop()
test_transformation_creator.py 文件源码
python
阅读 24
收藏 0
点赞 0
评论 0
评论列表
文章目录