有没有办法把PySpark命令翻译成SQL并判断它?例如,转向:

spark.table("this_table").filter(F.col("first_col")>100).select("second_col", "third_col")

SELECT second_col, third_col FROM this_table WHERE first_col > 100

回到一天,我认为R库Sparkly有类似的东西,但我似乎找不到这个功能了.

推荐答案

假设在PySpark中没有直接的方法来实现这一点,我能想出的最好方法是:

import pandas as pd
import pandas_to_sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

input_data = [
    (100, 2, 3),
    (200, 4, 5),
    (300, 6, 7),
]
pd_df = spark.createDataFrame(input_data, ["first_col", "second_col", "third_col"]).toPandas()

table_name = "this_table"
df = pandas_to_sql.wrap_df(pd_df, table_name)
df[df["first_col"]>100][["second_col", "third_col"]].get_sql_string()

输出:

'SELECT (second_col) AS second_col, (third_col) AS third_col FROM this_table WHERE ((first_col > 100)) '

我知道这并不完全是"转换"PySpark到SQL.但是,如果您熟悉PySpark和Pandas,您可以暂时将其用作您的用例的解决方案.

目前使用的图书馆是pandas_to_sql.它不是生产准备,似乎没有积极维护,所以使用它在您自己的意愿.

除此之外,我不认为有任何简单的方法来做你正在寻找的事情.

否则,如果您只想查看物理计划,而不是精确的SQL,则可以使用explain(),如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

data = [
    (100, 2, 3),
    (200, 4, 5),
    (300, 6, 7),
]

spark_df = spark.createDataFrame(data, ["first_col", "second_col", "third_col"])
spark_df = spark_df.filter(col("first_col")>100).select("second_col", "third_col")
spark_df.explain()

输出:

== Physical Plan ==
*(1) Project [second_col#1L, third_col#2L]
+- *(1) Filter (isnotnull(first_col#0L) AND (first_col#0L > 100))
   +- *(1) Scan ExistingRDD[first_col#0L,second_col#1L,third_col#2L]

Sql相关问答推荐

创建每小时重置的序列号

替换条件中的单元格值

为什么在postgres中,横向连接比相关子查询快?

如何在postgres函数中插入后返回布尔值?

PostgreSQL:按小时查看调整日期

如何将`now()`作为SQL插入语句的一部分?

如何从Spark SQL的JSON列中提取动态数量的键值对

将一个数组反嵌套到另外两个数组SQL中(Athena/presto)

如何用QuestDB生成蜡烛图?

如何在Postgres中为单值输入多行?

统计PostgreSQL中前10个最大大小表的行数

如何使用jsonn_populate_record()插入到包含IDENTITY列的表中

将二维数组的第一个和第二个元素取消嵌套到两个一维数组中

具有多个表 JOINS 的 STRING_AGG 的替代方法 (SQL Server 2016)

验证某个日期前后的连续代码

PostgreSQL中如何提取以特定字符开头的字符串中的所有单词?

Snowflake中的动态SQL优化

在多个表上递归查找

如何通过子 Select 在一次更新(并行数组)中多次更新相同的行

Athena:从字符串birth_dt列计算年龄