我想要读json份档案.现在,我正在做以下逻辑,这并不是动态的.

df = spark.read.option("multiline", True).json(loc)
df = df.select("data.*", "event.*", "resource_id", "resource_kind", "resource_uri")

我将不得不多次写入column.*次,因为文件非常嵌套,它有多个StructType

其方案如下所示:

root
 |-- data: struct (nullable = true)
 |    |-- accounts: struct (nullable = true)
 |    |    |-- accounting_reference_date: struct (nullable = true)
 |    |    |    |-- day: string (nullable = true)
 |    |    |    |-- month: string (nullable = true)
 |    |    |-- last_accounts: struct (nullable = true)
 |    |    |    |-- made_up_to: string (nullable = true)
 |    |    |    |-- period_end_on: string (nullable = true)
 |    |    |    |-- period_start_on: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- next_accounts: struct (nullable = true)
 |    |    |    |-- due_on: string (nullable = true)
 |    |    |    |-- overdue: boolean (nullable = true)
 |    |    |    |-- period_end_on: string (nullable = true)
 |    |    |    |-- period_start_on: string (nullable = true)
 |    |    |-- next_due: string (nullable = true)
 |    |    |-- next_made_up_to: string (nullable = true)
 |    |    |-- overdue: boolean (nullable = true)
 |    |-- can_file: boolean (nullable = true)
 |    |-- company_name: string (nullable = true)
 |    |-- company_number: string (nullable = true)
 |    |-- company_status: string (nullable = true)
 |    |-- confirmation_statement: struct (nullable = true)
 |    |    |-- last_made_up_to: string (nullable = true)
 |    |    |-- next_due: string (nullable = true)
 |    |    |-- next_made_up_to: string (nullable = true)
 |    |    |-- overdue: boolean (nullable = true)
 |    |-- date_of_creation: string (nullable = true)
 |    |-- etag: string (nullable = true)
 |    |-- has_charges: boolean (nullable = true)
 |    |-- is_community_interest_company: boolean (nullable = true)
 |    |-- jurisdiction: string (nullable = true)
 |    |-- last_full_members_list_date: string (nullable = true)
 |    |-- links: struct (nullable = true)
 |    |    |-- charges: string (nullable = true)
 |    |    |-- filing_history: string (nullable = true)
 |    |    |-- officers: string (nullable = true)
 |    |    |-- persons_with_significant_control: string (nullable = true)
 |    |    |-- persons_with_significant_control_statements: string (nullable = true)
 |    |    |-- registers: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |-- previous_company_names: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- ceased_on: string (nullable = true)
 |    |    |    |-- effective_from: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- registered_office_address: struct (nullable = true)
 |    |    |-- address_line_1: string (nullable = true)
 |    |    |-- address_line_2: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- po_box: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |-- registered_office_is_in_dispute: boolean (nullable = true)
 |    |-- sic_codes: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- subtype: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- published_at: string (nullable = true)
 |    |-- timepoint: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- resource_id: string (nullable = true)
 |-- resource_kind: string (nullable = true)
 |-- resource_uri: string (nullable = true)

由于很少有字段具有相同的名称,因此我需要从根捕获字段名.

例如.字段period_start_onlast_accountsnext_accounts中都存在. 因此,我需要将列名设置如下:

data.accounts.last_accounts.period_start_on

data.accounts.next_accounts.period_start_on

我认为我采取的方法不会花我更长的时间.你能建议一下阅读JSON的有效方法吗?另外,我们如何才能识别具有相同名称的2个字段.

谢谢

推荐答案

首先,您可以阅读此主题以帮助您重命名字段:

Rename nested field in spark dataframe

在此之后,您可以遵循一种方法,使用一些函数来动态分解列,以获得struct_field和arrayfield

def find_array_fields(dataframe):
    array_fields = []
    for field in dataframe.schema.fields:
        if isinstance(field.dataType, ArrayType):
            array_fields.append(field.name)
    return array_fields

def find_struct_fields(dataframe):
    struct_fields = []
    for field in dataframe.schema.fields:
        if isinstance(field.dataType, StructType):
            struct_fields.append(field.name)
    return struct_fields

def explode_array_fields(dataframe, array_fields: list):
    for field in array_fields:
        dataframe = dataframe.withColumn(field, explode_outer(field))
    return dataframe

def explode_struct_fields(dataframe, struct_fields: list):
    for field in struct_fields:
        columns_fields = dataframe.select(f"{field}.*").columns
        for c in columns_fields:
            #print(f"{field}_{c}")
            dataframe = dataframe.withColumn(f"{field}_{c}", col(f"{field}.{c}"))
        dataframe = dataframe.drop(field)
        #dataframe.printSchema()

    return dataframe

def explode_all(dataframe):
    while len(find_array_fields(dataframe)) > 0 or len(find_struct_fields(dataframe)) > 0:
        # Try to explode array fields
        array_fields = find_array_fields(dataframe)
        if len(array_fields) > 0:
            dataframe = explode_array_fields(dataframe, array_fields)
        # Try to explode struct fields
        struct_fields = find_struct_fields(dataframe)
        if len(struct_fields) > 0:
            dataframe = explode_struct_fields(dataframe, struct_fields)
    return dataframe

在声明这些函数之后,您可以这样调用它:

from pyspark.sql.types import *
from pyspark.sql.functions import *

path = "/your_folder/"
df1 = spark.read.format("your_format").load(path)
df2 = explode_all(df1)
df2.display()

Python相关问答推荐

如何根据日期和时间将状态更新为已过期或活动?

如何使用symy打印方程?

Pandas实际上如何对基于自定义的索引(integer和非integer)执行索引

当多个值具有相同模式时返回空

运行总计基于多列pandas的分组和总和

如何让Flask 中的请求标签发挥作用

如何在类和classy-fastapi -fastapi- followup中使用FastAPI创建路由

如何从pandas的rame类继承并使用filepath实例化

启用/禁用shiny 的自动重新加载

lityter不让我输入左边的方括号,'

Pandas:填充行并删除重复项,但保留不同的值

Pandas—堆栈多索引头,但不包括第一列

统计numpy. ndarray中的项目列表出现次数的最快方法

如何获得3D点的平移和旋转,给定的点已经旋转?

简单 torch 模型测试:ModuleNotFoundError:没有名为';Ultralytics.yolo';

获取git修订版中每个文件的最后修改时间的最有效方法是什么?

当我定义一个继承的类时,我可以避免使用`metaclass=`吗?

如何在Python中从html页面中提取html链接?

Pandas:使列中的列表大小与另一列中的列表大小相同

基于2级列表的Pandas 切片3级多索引