我正在try 使用SQLModel在数据库中插入记录,其中的数据如下所示. 一个House对象,它有一种 colored颜色 和许多位置. 地点也将与许多房屋相关联.输入内容为:

[
    {
        "color": "red",
        "locations": [
            {"type": "country", "name": "Netherlands"},
            {"type": "municipality", "name": "Amsterdam"},
        ],
    },
    {
        "color": "green",
        "locations": [
            {"type": "country", "name": "Netherlands"},
            {"type": "municipality", "name": "Amsterdam"},
        ],
    },
]

这里有一个我正在try 做的可重复的例子:

import asyncio
from typing import List

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import Field, Relationship, SQLModel, UniqueConstraint
from sqlmodel.ext.asyncio.session import AsyncSession

DATABASE_URL = "sqlite+aiosqlite:///./database.db"


engine = create_async_engine(DATABASE_URL, echo=True, future=True)


async def init_db() -> None:
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)


SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


class HouseLocationLink(SQLModel, table=True):
    house_id: int = Field(foreign_key="house.id", nullable=False, primary_key=True)
    location_id: int = Field(
        foreign_key="location.id", nullable=False, primary_key=True
    )


class Location(SQLModel, table=True):
    id: int = Field(primary_key=True)
    type: str  # country, county, municipality, district, city, area, street, etc
    name: str  # Amsterdam, Germany, My Street, etc

    houses: List["House"] = Relationship(
        back_populates="locations",
        link_model=HouseLocationLink,
    )

    __table_args__ = (UniqueConstraint("type", "name"),)


class House(SQLModel, table=True):
    id: int = Field(primary_key=True)
    color: str = Field()
    locations: List["Location"] = Relationship(
        back_populates="houses",
        link_model=HouseLocationLink,
    )
    # other fields...


data = [
    {
        "color": "red",
        "locations": [
            {"type": "country", "name": "Netherlands"},
            {"type": "municipality", "name": "Amsterdam"},
        ],
    },
    {
        "color": "green",
        "locations": [
            {"type": "country", "name": "Netherlands"},
            {"type": "municipality", "name": "Amsterdam"},
        ],
    },
]


async def add_houses(payload) -> List[House]:
    result = []
    async with SessionLocal() as session:
        for item in payload:
            locations = []
            for location in item["locations"]:
                locations.append(Location(**location))
            house = House(color=item["color"], locations=locations)
            result.append(house)
        session.add_all(result)
        await session.commit()


asyncio.run(init_db())
asyncio.run(add_houses(data))

问题是,当我运行这段代码时,它试图将重复的Location对象与House对象一起插入. 我很乐意在这里使用relationship,因为它使访问house.locations变得非常容易.

然而,我一直无法想出如何阻止它try 插入重复的位置.理想情况下,我应该有一个映射器功能来执行get_or_create个位置.

我所见过的最接近实现这一点的是SQLAlChemy的association proxy.但看起来SQLModel不支持这一点.

有谁有办法实现这一点吗?如果您知道如何使用SQLAlChemy而不是SQLModel来完成此任务,我将有兴趣看看您的解决方案.我还没有开始这个项目,所以如果它能让我的生活更轻松的话,我还不如使用SQLAlChemy.

我也试着调整了sa_relationship_kwargs个,

sa_relationship_kwargs={
    "lazy": "selectin",
    "cascade": "none",
    "viewonly": "true",
}

但这阻止了关联条目被添加到HouseLocationLink表中.

任何指点都将不胜感激.即使这意味着我要彻底改变我的方法.

谢谢!

推荐答案

我之所以写这个解决方案,是因为你提到你对使用SQLAlchemy持开放态度.正如您所提到的,您需要关联代理,但您还需要"Unique Objects".我对它进行了调整,使其能够处理异步查询(而不是同步查询),与我的个人偏好保持一致,所有这些都不会显著改变逻辑.

import asyncio
from sqlalchemy import UniqueConstraint, ForeignKey, select, text, func
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy

class Base(DeclarativeBase):
    pass

class UniqueMixin:
    cache = {}

    @classmethod
    async def as_unique(cls, session: AsyncSession, *args, **kwargs):
        key = cls, cls.unique_hash(*args, **kwargs)
        if key in cls.cache:
            return cls.cache[key]
        with session.no_autoflush:
            statement = select(cls).where(cls.unique_filter(*args, **kwargs)).limit(1)
            obj = (await session.scalars(statement)).first()
            if obj is None:
                obj = cls(*args, **kwargs)
                session.add(obj)
        cls.cache[key] = obj
        return obj

    @classmethod
    def unique_hash(cls, *args, **kwargs):
        raise NotImplementedError("Implement this in subclass")

    @classmethod
    def unique_filter(cls, *args, **kwargs):
        raise NotImplementedError("Implement this in subclass")

class Location(UniqueMixin, Base):
    __tablename__ = "location"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column()
    type: Mapped[str] = mapped_column()
    house_associations: Mapped[list["HouseLocationLink"]] = relationship(back_populates="location")
    __table_args = (UniqueConstraint(type, name),)

    @classmethod
    def unique_hash(cls, name, type):
        # this is the key for the dict
        return type, name

    @classmethod
    def unique_filter(cls, name, type):
        # this is how you want to establish the uniqueness
        # the result of this filter will be the value in the dict
        return (cls.type == type) & (cls.name == name)

class House(Base):
    __tablename__ = "house"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column()
    location_associations: Mapped[list["HouseLocationLink"]] = relationship(back_populates="house")
    locations: AssociationProxy[list[Location]] = association_proxy(
        "location_associations",
        "location",
        # you need this so you can directly add ``Location`` objects to ``House``
        creator=lambda location: HouseLocationLink(location=location),
    )

class HouseLocationLink(Base):
    __tablename__ = "houselocationlink"
    house_id: Mapped[int] = mapped_column(ForeignKey(House.id), primary_key=True)
    location_id: Mapped[int] = mapped_column(ForeignKey(Location.id), primary_key=True)
    location: Mapped[Location] = relationship(back_populates="house_associations")
    house: Mapped[House] = relationship(back_populates="location_associations")

engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")

async def main():
    data = [
        {
            "name": "red",
            "locations": [
                {"type": "country", "name": "Netherlands"},
                {"type": "municipality", "name": "Amsterdam"},
            ],
        },
        {
            "name": "green",
            "locations": [
                {"type": "country", "name": "Netherlands"},
                {"type": "municipality", "name": "Amsterdam"},
            ],
        },
    ]

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session, session.begin():
        for item in data:
            house = House(
                name=item["name"],
                locations=[await Location.as_unique(session, **location) for location in item["locations"]]
            )
            session.add(house)

    async with AsyncSession(engine) as session:
        statement = select(func.count(text("*")), Location)
        assert await session.scalar(statement) == 2

        statement = select(func.count(text("*")), House)
        assert await session.scalar(statement) == 2

        statement = select(func.count(text("*")), HouseLocationLink)
        assert await session.scalar(statement) == 4


asyncio.run(main())

您可以注意到,断言确实通过了,没有违反唯一约束,也没有多次插入.我留下了一些内联注释,其中提到了这段代码的"关键"方面.如果您多次运行此代码,您会注意到只添加了新的House个对象和相应的HouseLocationLink个对象,没有添加新的Location个对象.对于每个键-值对,将只进行一个查询来缓存此行为.

Python相关问答推荐

Python tkinter关闭第一个窗口,同时打开第二个窗口

Pandas .类型错误:只能将字符串(而不是int)连接到字符串

使用子字符串动态更新Python DataFrame中的列

将大小为n*512的数组绘制到另一个大小为n*256的数组的PC组件

已安装' owiener ' Python模块,但在导入过程中始终没有名为owiener的模块

是否有方法将现有的X-Y图转换为X-Y-Y1图(以重新填充)?

给定数据点,制定它们的关系

在Pandas框架中截短至固定数量的列

我在使用fill_between()将最大和最小带应用到我的图表中时遇到问题

运行终端命令时出现问题:pip start anonymous"

处理带有间隙(空)的duckDB上的重复副本并有效填充它们

PMMLPipeline._ fit()需要2到3个位置参数,但给出了4个位置参数

如何从pandas的rame类继承并使用filepath实例化

在Python argparse包中添加formatter_class MetavarTypeHelpFormatter时, - help不再工作""""

海上重叠直方图

Scrapy和Great Expectations(great_expectations)—不合作

不允许访问非IPM文件夹

为什么numpy. vectorize调用vectorized函数的次数比vector中的元素要多?

使用BeautifulSoup抓取所有链接

在极中解析带有数字和SI前缀的字符串