我需要替换大型镶木地板文件的多列(100s-1000s列)的值.我用的是火种.

我有一个使用replace的工作实现,它使用较少的列数,但当列数在replace的数量级时,甚至需要很长时间才能从我所看到的(>每列3-4s)生成spark 计划.因此,我正在寻找一种更快的实现.

value_label_map = {"col1": {"val1": "new_val1"}, "col2": {"val2": "new_val2"}}
for k, v in value_label_map.items():
    print(f"replacing {k}")
    columns_to_replace.append(k)
    df = df.replace(to_replace=v, subset=k)

我try 了另一种方法,但我找不到一种方法来访问pysppark Column对象的值来查找词典.

替代实施

def replace_values(col, value_map):
    if value_map:
        return when(col.isin(list(value_map.keys())),value_label_map[col]).otherwise(col)
    else:
        return col

df = spark.read.parquet("some-path")
updated_cols = [replace_values(df[col_name], value_labels.get(col_name)).alias(col_name) for col_name in df_values_renamed.columns]

这样做的问题是,我不能使用Column对象查找value_labels.

推荐答案

你可以试着把所有的东西都装进select英镑的包里.因为replace是基于when条语句,所以让我们直接使用它们:

def replace_from_dict(col_name, dict):
    """for each (k,v) item in dict, replace value k from col_name by value v."""
    res = None
    for k, v in dict.items():
        if res is None:
            res = F.when(F.col(col_name) == k, F.lit(v))
        else:
            res = res.when(F.col(col_name) == k, F.lit(v))
    return res.otherwise(F.col(col_name)).alias(col_name)

def replace_or_not(col_name):
    """generate a column replacement if need be, keeping the column otherwise"""
    if col_name in value_label_map:
        return replace_from_dict(col_name, value_label_map[col_name])
    else:
        return col_name

result = df.select(*[replace_or_not(c) for c in df.columns])

Python相关问答推荐

使用Python和PRNG(不是梅森龙卷风)有效地生成伪随机浮点数在[0,1)中均匀?

inspect_asm不给出输出

使用Beautiful Soup获取第二个srcset属性

从包含数字和单词的文件中读取和获取数据集

Chatgpt API不断返回错误:404未能从API获取响应

' osmnx.shortest_track '返回有效源 node 和目标 node 的'无'

组/群集按字符串中的子字符串或子字符串中的字符串轮询数据框

实现自定义QWidgets作为QTimeEdit的弹出窗口

创建可序列化数据模型的最佳方法

Pandas Loc Select 到NaN和值列表

多处理队列在与Forking http.server一起使用时随机跳过项目

用砂箱开发Web统计分析

Python Pandas获取层次路径直到顶层管理

如何在TensorFlow中分类多个类

如何在PySide/Qt QColumbnView中删除列

干燥化与列姆化的比较

循环浏览每个客户记录,以获取他们来自的第一个/最后一个渠道

在Docker容器(Alpine)上运行的Python应用程序中读取. accdb数据库

处理Gekko的非最优解

用两个字符串构建回文