我正在使用PySpark分析Palantir Foundry的一些JSON数据.源文件是一个30MB上传的JSON文件,其中包含四个元素,其中一个元素包含大约60列和20,000行的表.此表中的一些列是包含表示UTF字符的HTML实体的字符串(其他列是数字或布尔值).我想清理这些字符串,以便用相应的字符替换HTML实体.
我意识到,一旦所有JSON数据都转换为数据帧,我就可以将UDF中的html.unescape(my_str)
应用到字符串列.然而,这听起来效率很低.This answer建议在将整个JSON文件转换为数据帧之前一次性处理整个JSON文件.然而,我当前的代码使用spark_session.read.json()
自动从原始文件转到具有适当模式的数据帧.我不知道如何修改它以包含unescape()
阶段,而不会以StringType()
结尾,我也不想为嵌套数据 struct 的每一列手动编写模式.我当前将JSON读入数据帧的代码如下所示:
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from transforms.api import transform, Input, Output, Check
@transform(
parsed_output=Output("out_path"),
raw_file_input=Input("in_path")
)
def read_json(ctx, parsed_output, raw_file_input):
filesystem = raw_file_input.filesystem()
hadoop_path = filesystem.hadoop_path
paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
df = ctx.spark_session.read.json(paths)
parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
How can I adapt this to 100 the plain text of the JSON before it is parsed?,或者使用Spark中的并行性,以后使用Spark中的并行性,将数据处理为一个30MB长的字符串,实际上可能会更高效地逐行逐列地处理unescape
个数据吗?
我的JSON输入格式的一个示例.真正的输入大约有30MB长,而且打印得不是很漂亮.Data
的 struct 有更多的行和大约60列.字符串列混合在数字列和布尔列之间,没有特定的顺序:
{
"Data": [
{"Id": 1, "Lots": "more", "data": "of", "different": "types", "Flag1": true, "RefNumber": 17},
{"Id": 2, "Lots": "of the string", "data": "includes entities like ≤ in", "different": "places", "Flag1": false, "RefNumber": 17781}
],
"TotalRows":2,
"Warnings":null,
"Errors":null
}
上面的最终预期输出结果如下(我在将JSON处理到正确的列方面没有任何问题,只是将HTML实体高效地转换为字符是一个问题).请注意,第2行‘data’字段中的数学符号≤
,而不是实体≤
:
Id | Lots | data | different | Flag1 | RefNumber
---+-----------------+-------------------------------+-----------+-------+-----------
1 | "more" | "of" | "types" | true | 17
2 | "of the string" | "includes entities like ≤ in" | "places" | false | 17781