我正在使用PySpark分析Palantir Foundry的一些JSON数据.源文件是一个30MB上传的JSON文件,其中包含四个元素,其中一个元素包含大约60列和20,000行的表.此表中的一些列是包含表示UTF字符的HTML实体的字符串(其他列是数字或布尔值).我想清理这些字符串,以便用相应的字符替换HTML实体.

我意识到,一旦所有JSON数据都转换为数据帧,我就可以将UDF中的html.unescape(my_str)应用到字符串列.然而,这听起来效率很低.This answer建议在将整个JSON文件转换为数据帧之前一次性处理整个JSON文件.然而,我当前的代码使用spark_session.read.json()自动从原始文件转到具有适当模式的数据帧.我不知道如何修改它以包含unescape()阶段,而不会以StringType()结尾,我也不想为嵌套数据 struct 的每一列手动编写模式.我当前将JSON读入数据帧的代码如下所示:

from transforms.verbs.dataframes import sanitize_schema_for_parquet
from transforms.api import transform, Input, Output, Check

@transform(
    parsed_output=Output("out_path"),
    raw_file_input=Input("in_path")
)
def read_json(ctx, parsed_output, raw_file_input):
    filesystem = raw_file_input.filesystem()
    hadoop_path = filesystem.hadoop_path
    paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]

    df = ctx.spark_session.read.json(paths)

    parsed_output.write_dataframe(sanitize_schema_for_parquet(df))

How can I adapt this to 100 the plain text of the JSON before it is parsed?,或者使用Spark中的并行性,以后使用Spark中的并行性,将数据处理为一个30MB长的字符串,实际上可能会更高效地逐行逐列地处理unescape个数据吗?

我的JSON输入格式的一个示例.真正的输入大约有30MB长,而且打印得不是很漂亮.Data的 struct 有更多的行和大约60列.字符串列混合在数字列和布尔列之间,没有特定的顺序:

{
    "Data": [
        {"Id": 1, "Lots": "more", "data": "of", "different": "types", "Flag1": true, "RefNumber": 17},
        {"Id": 2, "Lots": "of the string", "data": "includes entities like ≤ in", "different": "places", "Flag1": false, "RefNumber": 17781}
    ],
    "TotalRows":2,
    "Warnings":null,
    "Errors":null
}

上面的最终预期输出结果如下(我在将JSON处理到正确的列方面没有任何问题,只是将HTML实体高效地转换为字符是一个问题).请注意,第2行‘data’字段中的数学符号,而不是实体≤:

Id | Lots            | data                          | different | Flag1 | RefNumber
---+-----------------+-------------------------------+-----------+-------+-----------
 1 | "more"          | "of"                          | "types"   | true  |        17
 2 | "of the string" | "includes entities like ≤ in" | "places"  | false |     17781

推荐答案

一般而言,您链接的答案(How to decode HTML entities in Spark?)在这里是正确的方法.为了实现您想要的(让Spark自动为您推断模式的便利性),以下是我将如何在Foundry中实现这一点.

  1. 作为第一步,做一些类似这样的事情(注意:我实际上并没有运行这段代码,所以可能有一两个小错误,但希望它说明了这个 idea )
@transform(
    output_json=Output("/NAMESPACE/PROJECT/datasets/preprocess/escaped_json"),
    input_json=Input("/NAMESPACE/PROJECT/datasets/preprocess/raw_json"),
)
def compute_function(output_json, input_json):
    input_fs = input_json.filesystem()
    output_fs = output_json.filesystem()

    def process_file(file_status):
        with input_fs.open(file_status.path) as input_fp:
            json_data = json.load(input_fp)
            clean_up_json_in_place(json_data)
            with output_fs.open(file_status.path, "w") as output_fp:
                json.dump(json_data, output_fp)

    input_fs.files().foreach(process_file)
  1. 第二步,做你现在正在做的事情,但是把输入设为/NAMESPACE/PROJECT/datasets/preprocess/escaped_json

Json相关问答推荐

2020-12年草案中的Json列举模式

为什么terraform不缩小这个策略JSON?'

如何使用Aeson解码带有Unicode字符的JSON文件?

由于无效的UTF-8开始字节0xa0,JSON被拒绝,但编码似乎有效

如何对未知/变量键的字典进行编码?

Powershell解析JSON文件中的键或值

Bicep脚本中如何设置弹性池的维护窗口?

将环境变量值从 yaml 传递到 json

使用 jq 将消息转换为数组

我如何将 JSON 格式与 flutter 一起使用?帮助使用 Gamebanana api

使用 SwiftUI 在 API 调用中解码嵌套 JSON 响应时遇到问题

Flutter:在本地文件 json 中搜索特殊的阿拉伯字符

JSON 模式实际用于什么目的?

UTF-8 字符编码之战 json_encode()

获取json中某个键的索引

如何找出实际安装了哪个版本的 bower 包?

在 Bash 中访问 JSON 对象 - 关联数组/列表/另一个模型

应该使用什么标头将 GZIP 压缩 JSON 从 Android 客户端发送到服务器?

JSON 到 JSON 转换器

Gson 将一组数据对象转换为 json - Android