Spark-将具有不同架构(列名称和序列)的DataFrame合并/合并到具有Master通用架构的DataFrame

发布于 2021-01-29 15:00:21

我尝试通过df.schema()将模式作为通用模式并将所有CSV文件加载到该模式,但是对于分配的模式失败,其他CSV文件的标题不匹配

任何建议,将不胜感激。如函数或Spark脚本中一样

关注者
0
被浏览
80
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    据我了解。您想要合并/合并具有不同架构的文件(尽管是一个主架构的子集)..我编写了此函数UnionPro,我认为它很适合您的要求-

    编辑 -添加了Pyspark版本

    def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {
    
        /**
         * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
         * Creates a Unioned DataFrame
         */
    
        import spark.implicits._
    
        val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct
    
        def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
          allCols.toList.map(x => x match {
            case x if myCols.contains(x) => col(x)
            case _                       => lit(null).as(x)
          })
        }
    
        // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases
    
        val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)
    
        val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)
    
        DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))
    
      }
    

    这是它的样本测试-

        val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
        val bDF = Seq(("C", 1), ("D", 2)).toDF("Name", "Sal")
        unionPro(List(aDF, bDF), spark).show
    

    输出为-

    +----+----+----+
    |Name|  ID| Sal|
    +----+----+----+
    |   A|   1|null|
    |   B|   2|null|
    |   C|null|   1|
    |   D|null|   2|
    +----+----+----+
    

    这是它的Pyspark版本-

    def unionPro(DFList: List[DataFrame], caseDiff: str = "N") -> DataFrame:
        """
        :param DFList:
        :param caseDiff:
        :return:
        This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
        Creates a Unioned DataFrame
        """
        inputDFList = DFList if caseDiff == "N" else [df.select([F.col(x.lower) for x in df.columns]) for df in DFList]
    
        # "This Preserves Order ( OrderedDict0-----------------------------------"
        from collections import OrderedDict
        ## As columnNames ( String) are hashable
        masterColStrList = list(OrderedDict.fromkeys(reduce(lambda x, y: x + y, [df.columns for df in inputDFList])))
    
        # Create masterSchema ignoring different Datatype & Nullable  in StructField and treating them same based on Name ignoring cases
        ignoreNullable = lambda x: StructField(x.name, x.dataType, True)
    
        import itertools
    
    
        # to get reliable results by groupby iterable must be sorted by grouping key
        # in sorted function key function( lambda) must be passed as named argument ( keyword argument)
        # but by Sorting now, I lost original order of columns. Hence I'll use masterColStrList while returning final DF
        masterSchema = StructType([list(y)[0] for x, y in itertools.groupby(
            sorted(reduce(lambda x, y: x + y, [[ignoreNullable(x) for x in df.schema.fields] for df in inputDFList]),
                   key=lambda x: x.name),
            lambda x: x.name)])
    
        def unionExpr(myCols: List[str], allCols: List[str]) -> List[Column]:
            return [F.col(x) if x in myCols else F.lit(None).alias(x) for x in allCols]
    
        # Create Empty Dataframe
        masterEmptyDF = spark.createDataFrame([], masterSchema)
    
        return reduce(lambda x, y: x.unionByName(y),
                      [df.select(unionExpr(df.columns, masterColStrList)) for df in inputDFList], masterEmptyDF).select(
            masterColStrList)
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看