Python-使用Spark将列转置为行

发布于 2021-02-02 23:12:31

我正在尝试将表的某些列转置为行。我正在使用Python和Spark 1.5.0。这是我的初始表:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

我想要这样的东西:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

有人知道我能做到吗?谢谢你的帮助。

关注者
0
被浏览
76
1 个回答
  • 面试哥
    面试哥 2021-02-02
    为面试而生,有面试问题,就找面试哥。

    使用基本的Spark SQL函数相对简单。

    python

    from pyspark.sql.functions import array, col, explode, struct, lit
    
    df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])
    
    def to_long(df, by):
    
        # Filter dtypes and split into column names and type description
        cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
        # Spark SQL supports only homogeneous columns
        assert len(set(dtypes)) == 1, "All columns have to be of the same type"
    
        # Create and explode an array of (column_name, column_value) structs
        kvs = explode(array([
          struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
        ])).alias("kvs")
    
        return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
    
    to_long(df, ["A"])
    

    Scala:

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions.{array, col, explode, lit, struct}
    
    val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")
    
    def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
      val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
      require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      
    
      val kvs = explode(array(
        cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
      ))
    
      val byExprs = by.map(col(_))
    
      df
        .select(byExprs :+ kvs.alias("_kvs"): _*)
        .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
    }
    
    toLong(df, Seq("A"))
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看