Pyspark数据框上的数据透视字符串列

发布于 2021-01-29 18:41:30

我有一个像这样的简单数据框:

rdd = sc.parallelize(
    [
        (0, "A", 223,"201603", "PORT"), 
        (0, "A", 22,"201602", "PORT"), 
        (0, "A", 422,"201601", "DOCK"), 
        (1,"B", 3213,"201602", "DOCK"), 
        (1,"B", 3213,"201601", "PORT"), 
        (2,"C", 2321,"201601", "DOCK")
    ]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

df_data.show()
 +---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

我需要按日期进行调整:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|2321.0|  null|  null|
|  0|   A| 422.0|  22.0| 223.0|
|  1|   B|3213.0|3213.0|  null|
+---+----+------+------+------+

一切正常。但是现在我需要对其进行透视,并获得一个非数字列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()

当然,我会得到一个例外:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

我想产生一些东西

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|DOCK  |  null|  null|
|  0|   A| DOCK |  PORT| DOCK|
|  1|   B|DOCK  |PORT  |  null|
+---+----+------+------+------+

有可能pivot吗?

关注者
0
被浏览
53
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    假设(id |type | date)组合是唯一的,并且您的唯一目标是枢纽而不是合计,则可以使用first(或任何其他不限于数值的函数):

    from pyspark.sql.functions import first
    
    (df_data
        .groupby(df_data.id, df_data.type)
        .pivot("date")
        .agg(first("ship"))
        .show())
    
    ## +---+----+------+------+------+
    ## | id|type|201601|201602|201603|
    ## +---+----+------+------+------+
    ## |  2|   C|  DOCK|  null|  null|
    ## |  0|   A|  DOCK|  PORT|  PORT|
    ## |  1|   B|  PORT|  DOCK|  null|
    ## +---+----+------+------+------+
    

    如果这些假设不正确,则必须预先汇总数据。例如,对于最常见的ship值:

    from pyspark.sql.functions import max, struct
    
    (df_data
        .groupby("id", "type", "date", "ship")
        .count()
        .groupby("id", "type")
        .pivot("date")
        .agg(max(struct("count", "ship")))
        .show())
    
    ## +---+----+--------+--------+--------+
    ## | id|type|  201601|  201602|  201603|
    ## +---+----+--------+--------+--------+
    ## |  2|   C|[1,DOCK]|    null|    null|
    ## |  0|   A|[1,DOCK]|[1,PORT]|[1,PORT]|
    ## |  1|   B|[1,PORT]|[1,DOCK]|    null|
    ## +---+----+--------+--------+--------+
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看