如何计算pyspark中每行某些列的最大值

发布于 2021-01-29 14:56:20

我在pyspark中使用sqlContext.sql函数读取了一个数据框。它包含4个数字列,每个客户都有信息(这是键ID)。我需要计算每个客户端的最大值并将此值加入数据框:

+--------+-------+-------+-------+-------+
|ClientId|m_ant21|m_ant22|m_ant23|m_ant24|
+--------+-------+-------+-------+-------+
|       0|   null|   null|   null|   null|
|       1|   null|   null|   null|   null|
|       2|   null|   null|   null|   null|
|       3|   null|   null|   null|   null|
|       4|   null|   null|   null|   null|
|       5|   null|   null|   null|   null|
|       6|     23|     13|     17|      8|
|       7|   null|   null|   null|   null|
|       8|   null|   null|   null|   null|
|       9|   null|   null|   null|   null|
|      10|     34|      2|      4|      0|
|      11|      0|      0|      0|      0|
|      12|      0|      0|      0|      0|
|      13|      0|      0|     30|      0|
|      14|   null|   null|   null|   null|
|      15|   null|   null|   null|   null|
|      16|     37|     29|     29|     29|
|      17|      0|      0|     16|      0|
|      18|      0|      0|      0|      0|
|      19|   null|   null|   null|   null|
+--------+-------+-------+-------+-------+

在这种情况下,客户端“ six”的最大值为23,而客户端“ ten”的最大值为30。“ null”在新列中自然为null。

请帮助我显示如何执行此操作。

关注者
0
被浏览
98
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    我认为将值组合到列表中而不是找到最大值将是最简单的方法。

    from pyspark.sql.types import *
    
    schema = StructType([
        StructField("ClientId", IntegerType(), True),
        StructField("m_ant21", IntegerType(), True),
        StructField("m_ant22", IntegerType(), True),
        StructField("m_ant23", IntegerType(), True),
        StructField("m_ant24", IntegerType(), True)
    ])
    
    df = spark\
        .createDataFrame(
            data=[(0, None, None, None, None),
                 (1, 23, 13, 17, 99),
                 (2, 0, 0, 0, 1),
                 (3, 0, None, 1, 0)],
            schema=schema)
    
    import pyspark.sql.functions as F
    
    def agg_to_list(m21,m22,m23,m24):
        return [m21,m22,m23,m24]
    
    u_agg_to_list = F.udf(agg_to_list, ArrayType(IntegerType()))
    
    df2 = df.withColumn('all_values', u_agg_to_list('m_ant21', 'm_ant22', 'm_ant23', 'm_ant24'))\
            .withColumn('max', F.sort_array("all_values", False)[0])\
            .select('ClientId', 'max')
    
    df2.show()
    

    输出:

    +--------+----+
    |ClientId|max |
    +--------+----+
    |0       |null|
    |1       |99  |
    |2       |1   |
    |3       |1   |
    +--------+----+
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看