我有一个名为df的数据帧,其功能是col1、col2、col3.它们的价值观应该结合在一起,并产生结果.每个组合将产生的结果在MAPPING_TABLE中定义.

但是,MAPPING_TABLE有时具有值‘*’.这意味着该功能可以有任何值,它不会影响结果.

这使得连接不可能(?)因为我需要判断在每行的连接中使用哪些特性.

什么才是解决这个问题的好办法呢?

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Example DataFrames
map_data = [('a', 'b', 'c', 'good'), ('a', 'a', '*', 'very good'), 
          ('b', 'd', 'c', 'bad'), ('a', 'b', 'a', 'very good'),
          ('c', 'c', '*', 'very bad'), ('a', 'b', 'b', 'bad')]

columns = ["col1", "col2", 'col3', 'result']

mapping_table = spark.createDataFrame(X, columns)


data =[[('a', 'b', 'c'), ('a', 'a', 'b' ), 
        ('c', 'c', 'a' ), ('c', 'c', 'b' ),
        ('a', 'b', 'b'), ('a', 'a', 'd')
      ]]

columns = ["col1", "col2", 'col3']
df = spark.createDataFrame(data, columns)

推荐答案

map_data转换为case statement:

ressql = 'case '
for m in map_data:
    p = [f"{p[0]} = '{p[1]}'" for p in zip(columns, m[:3]) if p[1] != "*"]
    ressql = ressql + ' when ' + ' and '.join(p) + f" then '{m[3]}'"
ressql = ressql + ' end'

df.withColumn('result', F.expr(ressql)).show()

ressql现在是

case 
  when col1 = 'a' and col2 = 'b' and col3 = 'c' then 'good' 
  when col1 = 'a' and col2 = 'a' then 'very good' 
  when col1 = 'b' and col2 = 'd' and col3 = 'c' then 'bad' 
  when col1 = 'a' and col2 = 'b' and col3 = 'a' then 'very good' 
  when col1 = 'c' and col2 = 'c' then 'very bad' 
  when col1 = 'a' and col2 = 'b' and col3 = 'b' then 'bad' 
end

结果:

+----+----+----+---------+
|col1|col2|col3|   result|
+----+----+----+---------+
|   a|   b|   c|     good|
|   a|   a|   b|very good|
|   c|   c|   a| very bad|
|   c|   c|   b| very bad|
|   a|   b|   b|      bad|
|   a|   a|   d|very good|
+----+----+----+---------+

Python相关问答推荐

在Python中,如何才能/应该使用decorator 来实现函数多态性?

Django序列化器没有验证或保存数据

两极:如何分割一个大 pyramid 并并行保存每个

请从Python访问kivy子部件的功能需要帮助

如何防止Plotly在输出到PDF时减少行中的点数?

Polars:使用列值引用when / then表达中的其他列

Pandas 在最近的日期合并,考虑到破产

Polars LazyFrame在收集后未返回指定的模式顺序

Pystata:从Python并行运行stata实例

将特定列信息移动到当前行下的新行

海运图:调整行和列标签

从numpy数组和参数创建收件箱

处理带有间隙(空)的duckDB上的重复副本并有效填充它们

如何在python polars中停止otherate(),当使用when()表达式时?

Odoo 16使用NTFS使字段只读

driver. find_element无法通过class_name找到元素'""

python中字符串的条件替换

如何禁用FastAPI应用程序的Swagger UI autodoc中的application/json?

* 动态地 * 修饰Python中的递归函数

巨 Python :逆向猜谜游戏