PySpark:计算列子集的行最大值,并添加到现有数据框中
我想为每一行计算最大列子集,并将其添加为现有列的新列Dataframe
。
我以非常尴尬的方式做到了这一点:
def add_colmax(df,subset_columns,colnm):
'''
calculate the maximum of the selected "subset_columns" from dataframe df for each row,
new column containing row wise maximum is added to dataframe df.
df: dataframe. It must contain subset_columns as subset of columns
colnm: Name of the new column containing row-wise maximum of subset_columns
subset_columns: the subset of columns from w
'''
from pyspark.sql.functions import monotonicallyIncreasingId
from pyspark.sql import Row
def get_max_row_with_None(row):
return float(np.max(row))
df_subset = df.select(subset_columns)
rdd = df_subset.map( get_max_row_with_None)
df_rowsum = rdd.map(Row(colnm)).toDF()
df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
df = df.withColumn("id",monotonicallyIncreasingId())
df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
return df
该功能的工作原理是:
rdd1 = sc.parallelize([("foo", 1.0,3.0,None),
("bar", 2.0,2.0,-10),
("baz", 3.3,1.2,10.0)])
df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")
df_new.collect()
返回:
[Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]
我认为,如果可以使用带有的用户定义函数withColumn
,则可以更简单地完成。但是我不知道该怎么做。如果您有更简单的方法来实现这一目标,请告诉我。我正在使用Spark
1.6
-
让我们从几个导入开始
from pyspark.sql.functions import col, lit, coalesce, greatest
接下来定义负无穷大字面量:
minf = lit(float("-inf"))
映射列并将结果传递给
greatest
:rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])
最后
withColumn
:df1.withColumn("rowmax", rowmax)
结果:
+---+---+---+----+------+ | v1| v2| v3| v4|rowmax| +---+---+---+----+------+ |foo|1.0|3.0|null| 3.0| |bar|2.0|2.0| -10| 2.0| |baz|3.3|1.2|null| 3.3| +---+---+---+----+------+
您可以将相同的模式用于不同的按行操作,
minf
以中性元素代替。例如:rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])
要么:
from operator import mul from functools import reduce rowproduct = reduce( mul, [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']] )
您自己的代码可以通过以下方式大大简化
udf
:from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf def get_max_row_with_None_(*cols): return float(max(x for x in cols if x is not None)) get_max_row_with_None = udf(get_max_row_with_None_, DoubleType()) df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))
更换
minf
用lit(float("inf"))
并greatest
用least
获得每行的最小值。