我在try 让这个发挥作用时遇到了很多麻烦.

我正在开发一个API,其中有活动(在模型中称为"会话"),这些活动具有两个时间相关属性,DID(活动本身的初始日期时间)和DURATION(活动本身的持续时间).

创建新活动时,我想判断将处理新活动的员工是否会与另一个活动重叠.

我使用Python(fastAPI)作为BE框架并使用mySQL数据库.目前我采取了以下方法:

def get_overlapping_staffSession(db: Session, session: schemas.SessionCreate) -> list[models.StaffSession]:
    staff_ids = []

    new_session_start = session.date
    new_session_end = (session.date + timedelta(hours=session.duration))

    for staff in session.staff:
        staff_ids.append(staff.id)

    overlapping_sessions = (
        db.query(models.StaffSession)
        .join(models.Session, models.StaffSession.id_session == models.Session.id)
        .filter(
            and_(
                models.StaffSession.id_staff.in_(staff_ids),
                models.Session.date <= new_session_end,
                models.Session.date + (models.Session.duration * timedelta(hours=1)) >= new_session_start
            )
        )
    ).all()

    return overlapping_sessions

这应该返回一个具有模型StaffSec的数组,该模型在新时间范围内并且拥有将在新活动中工作的员工.

值session.date属于类型datetime,模型会话中的列date属于类型datetime. 我在网上看到过不同的例子,使用它没有任何问题.

为了确保它有效,我try 在mySQL中实现相同的查询,以下内容使用与用于测试上面代码的值相同的值(id和时间范围).

SELECT 
    staffsession.*
FROM 
    staffsession
JOIN 
    session ON staffsession.id_session = session.id
WHERE 
    staffsession.id_staff IN (12)
    AND session.date <= '2024-04-15T16:30:00+01:00'
    AND session.date + INTERVAL session.duration SECOND >= '2024-04-15T13:30:00+01:00';

在执行此查询时,它在mySQL中工作得很好,在API中,它返回包含id为staff_ids的所有StaffMessage,但不按日期进行过滤.

我们非常欢迎您提供任何帮助,并感谢您抽出时间.

推荐答案

根据INTERVAL类型的SQLAchemy docs:

datetime. timedela()对象的类型.

Interval类型处理datetime. timeta对象.在PostgreSQL和Oracle中,使用原生INTERVAL类型;对于其他类型,该值存储为相对于"纪元"(1970年1月1日)的日期.

请注意,Interval类型目前在不本地支持Interval类型的平台上不提供日期算术运算.此类操作通常需要转换运算式的两侧(例如,首先将两侧转换为整纪元值),这目前是一个手动过程(例如通过express. full).

因此,使用间隔的日期时间算术在SQL上不直接支持.我们可以通过在一些示例日期上执行问题中的查询的间隔算术来验证这一点:

import sqlalchemy as sa

engine = sa.create_engine(...)
sessions = sa.Table(...)

# (create the table, insert some dates...)

QUERY = """
SELECT 
    date + INTERVAL duration SECOND
FROM 
    sessions
ORDER BY id
"""

with engine.connect() as conn:
    rows = conn.execute(sa.text(QUERY))
    for row in rows:
        print(row)

输出

(datetime.datetime(2024, 4, 15, 17, 30),)
(datetime.datetime(2024, 4, 15, 17, 0),)
(datetime.datetime(2024, 4, 15, 13, 0),)

但SQLAchemy查询

with engine.connect() as conn:
    q = sa.select(
        sessions.c.date + (sessions.c.duration * dt.timedelta(hours=1))
    ).order_by(sessions.c.id)
    rows = conn.execute(q)
    for row in rows:
        print(row)

输出

(20240422255000.0,)
(20240422252000.0,)
(20240422212000.0,)

至少有两种方法可以解决这个问题.我们可以将所有元素转换为UNIX时间戳并返回,或者我们可以使用sqlalchemy.text来插入所需的SQL.

with engine.connect() as conn:
    q = sa.select(
        sa.func.from_unixtime(
            sa.func.unix_timestamp(sessions.c.date) + (sessions.c.duration)
        ),
        sessions.c.date + sa.text('INTERVAL sessions.duration SECOND')
        
    ).order_by(sessions.c.id)
    rows = conn.execute(q)
    for row in rows:
        print(row)

给我们

(datetime.datetime(2024, 4, 15, 17, 30), datetime.datetime(2024, 4, 15, 17, 30))
(datetime.datetime(2024, 4, 15, 17, 0), datetime.datetime(2024, 4, 15, 17, 0))
(datetime.datetime(2024, 4, 15, 13, 0), datetime.datetime(2024, 4, 15, 13, 0))

上面的示例使用SQLAlchemy的核心语法. 等效的ORM语法是

overlapping_sessions = (
    db.query(models.StaffSession)
    .join(models.Session, models.StaffSession.id_session == models.Session.id)
    .filter(
        and_(
            models.StaffSession.id_staff.in_(staff_ids),
            models.Session.date <= new_session_end,
            sa.func.from_unixtime(sa.func.unix_timestamp(models.Sessions.date) + (model.Session.duration)) >= new_session_start
        )
    )
).all()

Python相关问答推荐

Pydantic:如何将对象列表表示为dict(将列表序列化为dict)

DuckDB将蜂巢分区插入拼花文件

如何使用矩阵在sklearn中同时对每个列执行matthews_corrcoef?

Python daskValue错误:无法识别的区块管理器dask -必须是以下之一:[]

DataFrame groupby函数从列返回数组而不是值

如何使用symy打印方程?

输出中带有南的亚麻神经网络

如何让这个星型模式在Python中只使用一个for循环?

字符串合并语法在哪里记录

使用Python和文件进行模糊输出

处理具有多个独立头的CSV文件

计算空值

如何在Great Table中处理inf和nans

提取数组每行的非零元素

当HTTP 201响应包含 Big Data 的POST请求时,应该是什么?  

合并相似列表

PySpark:如何最有效地读取不同列位置的多个CSV文件

极点替换值大于组内另一个极点数据帧的最大值

SpaCy:Regex模式在基于规则的匹配器中不起作用

用来自另一个数据框的列特定标量划分Polars数据框中的每一列,