PySpark DataFrames-枚举而不转换为熊猫的方法?

发布于 2021-01-29 19:36:19

我有一个很大的 pyspark.sql.dataframe.DataFrame 名为df。我需要某种枚举记录的方式-
因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)

在大熊猫中,我可以

indexes=[2,3,6,7] 
df[indexes]

在这里我想要类似的东西 (并且不将数据框转换为熊猫)

我最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:
        indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
* 使用where()函数搜索所需的值。

问题:

  1. 为什么它不起作用以及如何使其起作用?如何在数据框中添加一行?
  2. 以后可以做类似的事情吗:
         indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
  1. 有没有更快,更简单的处理方法?
关注者
0
被浏览
61
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    它不起作用,因为:

    1. 的第二个参数withColumn应该Column不是一个集合。np.array在这里不会工作
    2. 当您将"index in indexes"SQL表达式传递给时where indexes超出范围,并且不能将其解析为有效标识符

    PySpark > = 1.4.0

    您可以使用相应的窗口函数添加行号,并使用Column.isin方法或格式正确的查询字符串进行查询:

        from pyspark.sql.functions import col, rowNumber
        from pyspark.sql.window import Window
    
        w = Window.orderBy()
        indexed = df.withColumn("index", rowNumber().over(w))
    
        # Using DSL
        indexed.where(col("index").isin(set(indexes)))
    
        # Using SQL expression
        indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
    

    看起来调用无PARTITION BY子句的窗口函数会将所有数据移动到单个分区,因此上述毕竟不是最佳解决方案。

    有没有更快,更简单的处理方法?

    并不是的。Spark DataFrames不支持随机行访问。

    PairedRDD``lookup如果使用进行分区,则可以使用相对较快的方法进行访问HashPartitioner。还有一个index-
    rdd
    项目,它支持有效的查找。

    编辑

    与PySpark版本无关,您可以尝试执行以下操作:

        from pyspark.sql import Row
        from pyspark.sql.types import StructType, StructField, LongType
    
        row = Row("char")
        row_with_index = Row("char", "index")
    
        df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
        df.show(5)
    
        ## +----+
        ## |char|
        ## +----+
        ## |   a|
        ## |   b|
        ## |   c|
        ## |   d|
        ## |   e|
        ## +----+
        ## only showing top 5 rows
    
        # This part is not tested but should work and save some work later
        schema  = StructType(
            df.schema.fields[:] + [StructField("index", LongType(), False)])
    
        indexed = (df.rdd # Extract rdd
            .zipWithIndex() # Add index
            .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
            .toDF(schema)) # It will work without schema but will be more expensive
    
        # inSet in Spark < 1.3
        indexed.where(col("index").isin(indexes))
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看