我意外地注意到spark 点的奇怪行为.基本上,它可以对数据帧中不存在列执行where个函数:

print(spark.version)

df = spark.read.format("csv").option("header", True).load("abfss://some_abfs_path/df.csv")
print(type(df), df.columns.__len__(), df.count())

c = df.columns[0] # A column name before renaming
df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) # Add suffix to column names

print(c in df.columns)

try:
    df.select(c)
except:
    print("SO THIS DOESN'T WORK, WHICH MAKES SENSE.")

# BUT WHY DOES THIS WORK:
print(df.where(col(c).isNotNull()).count())
# IT'S USING c AS f"{c}_new"
print(df.where(col(f"{c}_new").isNotNull()).count())

输出:

3.1.2
<class 'pyspark.sql.dataframe.DataFrame'> 102 1226791
False
SO THIS DOESN'T WORK, WHICH MAKES SENSE.
1226791
1226791

正如您所看到的,奇怪的部分是,当列重命名后列cdf中不存在时,它仍然可以用于where函数.

我的直觉是引擎盖下的PYSPARK编译whereselect更名.但在这种情况下,这将是一个糟糕的设计,无法解释为什么新旧列名都可以工作.

感谢您的真知灼见,谢谢.

我在Azure数据库上运行着一些东西.

推荐答案

当你不确定的时候,用df.explain()来弄清楚引擎盖下到底发生了什么.这将证实你的直觉:

Spark context available as 'sc' (master = local[*], app id = local-1709748307134).
SparkSession available as 'spark'.
>>> df = spark.read.option("header", True).option("inferSchema", True).csv("taxi.csv")
>>> c = df.columns[0]
>>> from pyspark.sql.functions import *
>>> df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) 
>>> df.explain()
== Physical Plan ==
*(1) Project [VendorID#17 AS VendorID_new#51, tpep_pickup_datetime#18 AS tpep_pickup_datetime_new#52, tpep_dropoff_datetime#19 AS tpep_dropoff_datetime_new#53, passenger_count#20 AS passenger_count_new#54, trip_distance#21 AS trip_distance_new#55, RatecodeID#22 AS RatecodeID_new#56, store_and_fwd_flag#23 AS store_and_fwd_flag_new#57, PULocationID#24 AS PULocationID_new#58, DOLocationID#25 AS DOLocationID_new#59, payment_type#26 AS payment_type_new#60, fare_amount#27 AS fare_amount_new#61, extra#28 AS extra_new#62, mta_tax#29 AS mta_tax_new#63, tip_amount#30 AS tip_amount_new#64, tolls_amount#31 AS tolls_amount_new#65, improvement_surcharge#32 AS improvement_surcharge_new#66, total_amount#33 AS total_amount_new#67]
+- FileScan csv [VendorID#17,tpep_pickup_datetime#18,tpep_dropoff_datetime#19,passenger_count#20,trip_distance#21,RatecodeID#22,store_and_fwd_flag#23,PULocationID#24,DOLocationID#25,payment_type#26,fare_amount#27,extra#28,mta_tax#29,tip_amount#30,tolls_amount#31,improvement_surcharge#32,total_amount#33] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/charlie/taxi.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:string,tpep_dropoff_datetime:string,passenger_count:int,...


>>> df = df.where(col(c).isNotNull())
>>> df.explain()
== Physical Plan ==
*(1) Project [VendorID#17 AS VendorID_new#51, tpep_pickup_datetime#18 AS tpep_pickup_datetime_new#52, tpep_dropoff_datetime#19 AS tpep_dropoff_datetime_new#53, passenger_count#20 AS passenger_count_new#54, trip_distance#21 AS trip_distance_new#55, RatecodeID#22 AS RatecodeID_new#56, store_and_fwd_flag#23 AS store_and_fwd_flag_new#57, PULocationID#24 AS PULocationID_new#58, DOLocationID#25 AS DOLocationID_new#59, payment_type#26 AS payment_type_new#60, fare_amount#27 AS fare_amount_new#61, extra#28 AS extra_new#62, mta_tax#29 AS mta_tax_new#63, tip_amount#30 AS tip_amount_new#64, tolls_amount#31 AS tolls_amount_new#65, improvement_surcharge#32 AS improvement_surcharge_new#66, total_amount#33 AS total_amount_new#67]
+- *(1) Filter isnotnull(VendorID#17)
   +- FileScan csv [VendorID#17,tpep_pickup_datetime#18,tpep_dropoff_datetime#19,passenger_count#20,trip_distance#21,RatecodeID#22,store_and_fwd_flag#23,PULocationID#24,DOLocationID#25,payment_type#26,fare_amount#27,extra#28,mta_tax#29,tip_amount#30,tolls_amount#31,improvement_surcharge#32,total_amount#33] Batched: false, DataFilters: [isnotnull(VendorID#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/charlie/taxi.csv], PartitionFilters: [], PushedFilters: [IsNotNull(VendorID)], ReadSchema: struct<VendorID:int,tpep_pickup_datetime:string,tpep_dropoff_datetime:string,passenger_count:int,...

从下到上:FileScan读取数据,Filter丢弃不需要的数据,Project应用别名.对于Spark来说,这是一种明智的方式来构建它的DAG -尽可能Eager 地丢弃数据,这样你就不会浪费时间在上面操作-但正如你所注意到的,它可能会导致意外的行为.如果你想避免这种情况,在df.where()语句之前使用df.checkpoint()来具体化DataFrame-这将在你试图引用旧列名时给你预期的错误:

>>> from pyspark.sql.functions import *
>>> spark.sparkContext.setCheckpointDir("file:/tmp/")
>>> df = spark.read.option("header", True).option("inferSchema", True).csv("taxi.csv")
>>> c = df.columns[0]
>>> df = df.select(*[col(x).alias(f"{x}_new") for x in df.columns]) 
>>> df = df.checkpoint()
>>> df.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[VendorID_new#51,tpep_pickup_datetime_new#52,tpep_dropoff_datetime_new#53,passenger_count_new#54,trip_distance_new#55,RatecodeID_new#56,store_and_fwd_flag_new#57,PULocationID_new#58,DOLocationID_new#59,payment_type_new#60,fare_amount_new#61,extra_new#62,mta_tax_new#63,tip_amount_new#64,tolls_amount_new#65,improvement_surcharge_new#66,total_amount_new#67]


>>> df = df.where(col(c).isNotNull())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/homebrew/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 3325, in filter
    jdf = self._jdf.filter(condition._jc)
  File "/opt/homebrew/opt/apache-spark/libexec/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/homebrew/opt/apache-spark/libexec/python/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `VendorID` cannot be resolved. Did you mean one of the following? [`VendorID_new`, `extra_new`, `RatecodeID_new`, `mta_tax_new`, `DOLocationID_new`].;
'Filter isnotnull('VendorID)
+- LogicalRDD [VendorID_new#51, tpep_pickup_datetime_new#52, tpep_dropoff_datetime_new#53, passenger_count_new#54, trip_distance_new#55, RatecodeID_new#56, store_and_fwd_flag_new#57, PULocationID_new#58, DOLocationID_new#59, payment_type_new#60, fare_amount_new#61, extra_new#62, mta_tax_new#63, tip_amount_new#64, tolls_amount_new#65, improvement_surcharge_new#66, total_amount_new#67], false

>>> 

Python相关问答推荐

Python 约束无法解决n皇后之谜

Pandas 都是(),但有一个门槛

如何使用pytest来查看Python中是否存在class attribution属性?

如何在给定的条件下使numpy数组的计算速度最快?

如何在WSL2中更新Python到最新版本(3.12.2)?

为什么抓取的HTML与浏览器判断的元素不同?

使用NeuralProphet绘制置信区间时出错

如何从列表框中 Select 而不出错?

为什么在FastAPI中创建与数据库的连接时需要使用生成器?

在Docker容器(Alpine)上运行的Python应用程序中读取. accdb数据库

如何在FastAPI中替换Pydantic的constr,以便在BaseModel之外使用?'

如何将相同组的值添加到嵌套的Pandas Maprame的倒数第二个索引级别

仅使用预先计算的排序获取排序元素

如何获得满足掩码条件的第一行的索引?

TypeError:';Locator';对象无法在PlayWriter中使用.first()调用

仅取消堆叠最后三列

为什么在生成时间序列时,元组索引会超出范围?

#将多条一维曲线计算成其二维数组(图像)表示

极地数据帧:ROLING_SUM向前看

为什么内置的sorted()对于一个包含降序数字的列表来说,如果每个数字连续出现两次,会变慢?