我有一个"特殊的"CSV文件,格式如下:

A;ItemText;1;2
B;1;1.23,99
B;2;9.52,100
C;false

我想把这些数据转换成简单的模型.

目前,我将FieldInfo类划分为子类:

class CSVFieldInfo(FieldInfo):
    
    def __init__(self, **kwargs: Any):

        self.position = kwargs["position"]
        
        if not isinstance(self.position, int):
            raise ValueError("Position should be integer, got {}".format(type(self.position)))

        super().__init__()

def CSVField(position: int):
    return CSVFieldInfo(position=position)

此外,我还将BaseModel子类化:

class CSVBaseModel(BaseModel):
    
    @classmethod
    def from_string(cls, string: str, sep: str=";"):
        
        # no double definitions
        l = [x.field_info.position for x in cls.__fields__.values()]
        if not len(set(l)) == len(l):
            raise ValueError("At least one position is defined twice")
        
        # here i am stuck on how to populate the model correctly (including nested models)

然后,模型布局如下:

class CSVTypeA(CSVBaseModel):
    record_type: Literal["A"] = CSVField(position=0)
    record_text: str = CSVField(position=1)
    num: int = CSVField(position=2)

class CSVFile(CSVBaseModel):
    a: CSVTypeA

csv_string = \
"""A;ItemText;1;2
B;1;1.23,99
B;2;9.52,100
C;false"""

CSVFile.from_string(csv_string)

如何将正确的CSV-Line自动分配给正确的模型(通过字段"RECORD_TYPE"进行区分),从而填充PYDANIC模型"CSVFile"?

推荐答案

主要的问题是,如果不实际查看记录中的第一个字段,就无法知道记录是否与which种类型兼容.

因为您没有键-值对,所以您的记录基本上只是没有名称的字段列表,所以我们必须在判断之后确定正确的字段名称,我们处理的是哪种记录类型.

这意味着我们需要重新实现皮丹蒂克discriminated unions背后的一些魔力.幸运的是,我们可以利用ModelField在其sub_fields_mapping属性中保存discriminator key -> sub-field的字典这一事实.

因此,我们仍然可以利用皮丹蒂克提供的一些内置机制,正确地定义我们受到歧视的联盟.

但首先我们需要定义一些(示例性的)记录类型:

记录类型.py

from typing import Literal, Union

from pydantic import BaseModel


class CSVLine(BaseModel):
    record_type: str

    def some_method(self) -> None:
        print(self)


class CSVTypeA(CSVLine):
    record_type: Literal["A"]
    record_text: str
    num: int
    another_num: int


class CSVTypeB(CSVLine):
    record_type: Literal["B"]
    num_foo: int
    num_floaty: float
    num_bar: int


class CSVTypeC(CSVLine):
    record_type: Literal["C"]
    spam: bool


CSVType = Union[CSVTypeA, CSVTypeB, CSVTypeC]

接下来,我们定义模型将实际的CSV文件表示为custom root type,这将是我们区别对待的记录类型联合的list.

我们还将定义自己的类属性__csv_separator__来保存要用来拆分记录的字符串.

为了使处理该模型的实例更容易、更直观,我们还将定义/覆盖一些用于项访问、字符串表示等的定制方法.

最后,我们需要实现将CSV文件的各行解析为定制validator中适当记录类型的实例的整个魔术.

Csv_Model.py

from collections.abc import Iterator
from typing import Annotated, Any, ClassVar

from pydantic import BaseModel, Field, validator
from pydantic.fields import ModelField

from .record_types import CSVType


class CSVFile(BaseModel):
    __csv_separator__: ClassVar[str] = ";"
    __root__: list[Annotated[CSVType, Field(discriminator="record_type")]]

    def __iter__(self) -> Iterator[CSVType]:  # type: ignore[override]
        yield from self.__root__

    def __getitem__(self, item: int) -> CSVType:
        return self.__root__[item]

    def __str__(self) -> str:
        return str(self.__root__)

    def __repr__(self) -> str:
        return repr(self.__root__)

    @validator("__root__", pre=True, each_item=True)
    def dict_from_string(cls, v: Any, field: ModelField) -> Any:
        if not isinstance(v, str):
            return v  # let default Pydantic validation take over
        record_fields = v.strip().split(cls.__csv_separator__)
        discriminator_key = record_fields[0]
        assert field.sub_fields_mapping is not None
        try:  # Determine the model to validate against
            type_ = field.sub_fields_mapping[discriminator_key].type_
        except KeyError:
            raise ValueError(f"{discriminator_key} is not a valid key")
        assert issubclass(type_, BaseModel)
        field_names = type_.__fields__.keys()
        return dict(zip(field_names, record_fields))

这应该就是我们需要的全部.

要创建一个CSVFile的实例,我们只需要任何可迭代的字符串(CSV文件中的行).与所有定制根类型一样,我们可以通过使用__root__关键字参数调用__init__方法,或者通过将字符串的可迭代传递给parse_obj方法来初始化它.

演示

csv_string = """
A;ItemText;1;2
B;1;1.23;99
B;2;9.52;100
C;false
""".strip()

obj = CSVFile.parse_obj(csv_string.split("\n"))
print(obj[0])
obj[3].some_method()
print(obj.json(indent=4))

输出:

record_type='A' record_text='ItemText' num=1 another_num=2
record_type='C' spam=False
[
    {
        "record_type": "A",
        "record_text": "ItemText",
        "num": 1,
        "another_num": 2
    },
    {
        "record_type": "B",
        "num_foo": 1,
        "num_floaty": 1.23,
        "num_bar": 99
    },
    {
        "record_type": "B",
        "num_foo": 2,
        "num_floaty": 9.52,
        "num_bar": 100
    },
    {
        "record_type": "C",
        "spam": false
    }
]

附注:我们需要类变量__csv_separator__的原因是验证器是一个类方法,它需要知道要使用的分隔符.当然,您可以编写一个单独的方法(就像您在最初的POST中try 的那样),并将分隔符作为参数传递,然后临时变异类变量并调用parse_obj.但我认为,仅全局更改分隔符或在子类中有 Select 地更改分隔符可能会更容易.

此外,我认为没有理由显式指定位置,只要记录类型模型的字段(其定义顺序为and)与CSV记录的实际字段匹配即可.

Python相关问答推荐

Python tkinter关闭第一个窗口,同时打开第二个窗口

从流程获取定期更新

这些变量是否相等,因为它们引用相同的实例,尽管它们看起来应该具有不同的值?

保留包含pandas pandras中文本的列

在Transformer中使用LabelEncoding的ML模型管道

收件箱转换错误- polars.exceptions. ComputeHelp- pandera(0.19.0b3)带有polars

如何将带有逗号分隔的数字的字符串解析为int Array?

如何使用上下文管理器创建类的实例?

使用pandas、matplotlib和Yearbox绘制时显示错误的年份

Locust请求中的Python和参数

Pandas 第二小值有条件

Pydantic 2.7.0模型接受字符串日期时间或无

将整组数组拆分为最小值与最大值之和的子数组

将两只Pandas rame乘以指数

如何找到满足各组口罩条件的第一行?

如何记录脚本输出

基于字符串匹配条件合并两个帧

DataFrames与NaN的条件乘法

使用groupby方法移除公共子字符串

Flask Jinja2如果语句总是计算为false&