如何使用来自另一个数据框的新值更新pyspark数据框?

发布于 2021-01-29 14:10:07

我有两个Spark数据框:

数据框A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

和数据框B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。

我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说col1,并col2可以改变值(可更新),但是col3,..,coln是唯一的。我创建了一个哈希函数为hash(col3,..,coln)

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在,我想编写一些火花代码,基本上从B中选择哈希值不在A中的 行(因此,新行和更新后的行) ,并将它们与A中的行一起加入新的数据帧中。
pyspark?

编辑:数据框B可以有来自数据框A的额外列,因此无法进行联合。

样例

数据框A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据框B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果:数据框C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+
关注者
0
被浏览
142
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    这与用新值更新数据框列密切相关,除了您还想添加数据框B中的行。一种方法是首先执行链接的问题中概述的操作,然后将结果与数据框B合并并删除重复。

    例如:

    dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
        .select(
            'col_1',
            f.when(
                ~f.isnull(f.col('b.col_2')),
                f.col('b.col_2')
            ).otherwise(f.col('a.col_2')).alias('col_2'),
            'b.col_3'
        )\
        .union(dfB)\
        .dropDuplicates()\
        .sort('col_1')\
        .show()
    #+-----+-----+-----+
    #|col_1|col_2|col_3|
    #+-----+-----+-----+
    #|    a|  wew|    1|
    #|    b|  eee| null|
    #|    c|  rer|    3|
    #|    d|  yyy|    2|
    #+-----+-----+-----+
    

    如果您有很多要替换的列并且不想对它们全部进行硬编码,则可以更一般地使用列表推导:

    cols_to_update = ['col_2']
    
    dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
        .select(
            *[
                ['col_1'] + 
                [
                    f.when(
                        ~f.isnull(f.col('b.{}'.format(c))),
                        f.col('b.{}'.format(c))
                    ).otherwise(f.col('a.{}'.format(c))).alias(c)
                    for c in cols_to_update
                ] + 
                ['b.col_3']
            ]
        )\
        .union(dfB)\
        .dropDuplicates()\
        .sort('col_1')\
        .show()
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看