Question:联接两个数据集时,为什么在联接键列上两次应用筛选器isnotull?在物理计划中,它一旦作为PushedFilter应用,然后在它之后显式apply.为甚麽会这样呢?

代码:

import os
import pandas as pd, numpy as np
import pyspark
spark=pyspark.sql.SparkSession.builder.getOrCreate()

save_loc = "gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/"

df1 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]),
                                         'b': np.random.random(1000)}))

df2 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]),
                                         'b': np.random.random(1000)}))

df1.write.parquet(os.path.join(save_loc,"dfl_key_int"))
df2.write.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int = spark.read.parquet(os.path.join(save_loc,"dfl_key_int"))
dfr_int = spark.read.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int.join(dfr_int,on='a',how='inner').explain()



output:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#23L, b#24, b#28]
   +- BroadcastHashJoin [a#23L], [a#27L], Inner, BuildRight, false
      :- Filter isnotnull(a#23L)
      :  +- FileScan parquet [a#23L,b#24] Batched: true, DataFilters: [isnotnull(a#23L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfl_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:double>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
         +- Filter isnotnull(a#27L)
            +- FileScan parquet [a#27L,b#28] Batched: true, DataFilters: [isnotnull(a#27L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfr_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:double>

推荐答案

原因是,PushedFilter并不能保证在Spark将所有数据读入内存之前,所有数据都会按您所希望的那样进行过滤.有关PushedFilter是什么的更多背景信息,请查看this SO answer.

Parquet files

让我们来看看您的示例中所示的拼图文件.地块文件以列格式存储,也按行组(或块)组织.以下图片来自Apache Parquet docs强:

enter image description here

您可以看到,数据以列的方式存储,并被分成块(行组).现在,对于每个列/行块组合,Parquet存储一些元数据.在这张图中,您可以看到它包含一堆元数据,然后还有extra key/value pairs元数据.它们还包含有关数据的统计信息(取决于您的列的类型).

这些统计数据的一些示例包括:

  • 块的最小/最大值是多少(以防它对列的数据类型有意义)
  • 块是否具有非空值
  • ...

回到你的例子

你正在加入a纵队.要做到这一点,我们需要确保a没有空值.假设您的a列(忽略其他列)是这样存储的:

  • a column:
    • 区块1:0, 1, None, 1, 1, None
    • 区块2:0, 0, 0, 0, 0, 0
    • 《区块3:None, None, None, None, None, None

现在,使用我们可以立即(仅通过查看块的元数据)忽略块3的PushedFilter,我们甚至不必读取它!

但是如您所见,块1仍然包含空值.这是我们不能通过只查看块的元数据来过滤掉的东西.因此,我们必须读入整个块,然后使用您的物理计划中的第二个Filter node 在Spark中过滤其他空值.

Sql相关问答推荐

如何将多个 Select 查询从一个表中组合出来

SQL基于多个值 Select 单行

对非RUST源代码字符串使用`stringify!`,例如SQL查询

从依赖于其他表的值的XREF表中的值分组获得正确的计数?

计算周时出现SQL错误结果

OVER子句WITH PARTITION BY和ORDER BY忽略主查询的WHERE子句

按分类标准检索记录

在Postgres中合并相似的表

无法将发票与产品价格相关联

优化Postgres搜索未知长度的子串

使用左外部联接更正列中第+1行的值时重复

使用CTE在SNOWFLAKE中创建临时表

将具有嵌套 XML 的列转换为 SQL 中的表格格式?

MySQL中的递归查询邻接表深度优先?

如何从三个连接表中获取数据,并始终显示第一个表中的数据,以及第三个表中的空值或现有记录?

snowflake中的动态文件名生成

如何在 case 语句中使用聚合?

为重复的项目编号显示正在处理

查找具有相同连接列数据的所有记录

postgreSQL 中的循环表