数据框上的Pyspark UDF列

发布于 2021-01-29 16:17:33

我正在尝试根据某些列的值在数据框上创建新列。在所有情况下都返回null。任何人都知道这个简单示例出了什么问题吗?

df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])

spark_df = spark.createDataFrame(df)

def get_profile():
    if 'Foo'==1:
        return 'Foo'
    elif 'Bar' == 1:
        return 'Bar'
    elif 'Baz' ==1 :
        return 'Baz'

spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()

   Foo  Bar  Baz get_profile
    0    1    0        None
    1    0    0        None
    1    1    1        None

我希望get_profile列将为所有行填写。

我也尝试过这个:

spark_udf = udf(get_profile,StringType())

spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())

达到同样的效果。

关注者
0
被浏览
48
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    udf没有的列名是什么知识。因此,它会检查if/elif块中的每个条件,并且所有条件的计算结果均为False。因此函数将返回None

    您必须将您的代码重写udf为要检查的列:

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    def get_profile(foo, bar, baz):
        if foo == 1:
            return 'Foo'
        elif bar == 1:
            return 'Bar'
        elif baz == 1 :
            return 'Baz'
    
    spark_udf = udf(get_profile, StringType())
    spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
    spark_df.show()
    #+---+---+---+-----------+
    #|Foo|Bar|Baz|get_profile|
    #+---+---+---+-----------+
    #|  0|  1|  0|        Bar|
    #|  1|  0|  0|        Foo|
    #|  1|  1|  1|        Foo|
    #+---+---+---+-----------+
    

    如果您有很多列,并希望全部传递(按顺序):

    spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))
    

    更一般而言,您可以解压缩任何有序的列列表:

    cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
    spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))
    

    但是此特定操作不需要udf。我会这样:

    from pyspark.sql.functions import coalesce, when, col, lit
    
    spark_df.withColumn(
        "get_profile",
        coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
    ).show()
    #+---+---+---+-----------+
    #|Foo|Bar|Baz|get_profile|
    #+---+---+---+-----------+
    #|  0|  1|  0|        Bar|
    #|  1|  0|  0|        Foo|
    #|  1|  1|  1|        Foo|
    #+---+---+---+-----------+
    

    之所以pyspark.sql.functions.when()可行null,是因为如果条件求值False且未otherwise指定任何值,则默认情况下将返回。然后列表理解pyspark.sql.functions.coalesce将返回第一个非空列。

    请注意,这仅等效于udf列的顺序与get_profile函数中评估的顺序相同的情况。更明确地说,您应该执行以下操作:

    spark_df.withColumn(
        "get_profile",
        coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
    ).show()
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看