我有一个气流DAG,它有两个任务,我只想在满足特定条件的情况下运行它们.如果不满足这些条件,我希望跳过它们.

DAG如下所示:

@dag(
    dag_id="id_123",
    schedule=DAG_SCHEDULE,
    start_date=days_ago(0),
    catchup=False,
    default_args={
        "retries": 0,
    },
)
def dag_runner():
    @task(task_id="get_data_src_a")
    def get_data_src_a() -> list:
        # return data from src_a

    @task(task_id="get_data_src_b")
    def get_data_src_b() -> list:
        # return data from src_b

    @task(task_id="find_uniq_users")
    def find_uniq_users(users_from_a, users_from_b) -> list:
        # return users in src_a but not in src_b

    @task(task_id="do_something_with_users")
    def do_something_with_users(uniq_users):
        # do something with unique users

    users_from_a = get_data_src_a()
    users_from_b = get_data_src_b()
    uniq_users = find_uniq_users(users_from_a,users_from_b)
    do_something_with_users(uniq_users)

上面的代码可以工作,但我想对其进行改进.如果users_from_b是一个空列表,我不想让find_uniq_users运行.

如果uniq_users是空的,我也不希望do_something_with_users运行.

我try 了以下方法,但似乎不起作用:

if users_from_b:
   uniq_users = find_uniq_users(users_from_a, users_from_b)
else:
   uniq_users = users_from_a
if uniq_users:
   do_something_with_users(uniq_users)

有谁能告诉我我应该做什么吗?

推荐答案

有一个short_circuit操作员/装饰者,可以跳过所有有条件的下游任务,看起来你可以在你的情况下使用它.

def dag_runner():
    [...]

    @task.short_circuit(task_id="should_do_something_with_users")
    def should_do_something_with_users(uniq_users):
        return uniq_users

    users_from_a = get_data_src_a()
    users_from_b = get_data_src_b()
    uniq_users = find_uniq_users(users_from_a,users_from_b)
    should_do_something_with_users(uniq_users) >> do_something_with_users(uniq_users)

Python相关问答推荐

Matplotlib轮廓线值似乎不对劲

Python -Polars库中的滚动索引?

Python中是否有方法从公共域检索搜索结果

如何根据另一列值用字典中的值替换列值

如何自动抓取以下CSV

为什么符号没有按顺序添加?

Excel图表-使用openpyxl更改水平轴与Y轴相交的位置(Python)

对所有子图应用相同的轴格式

Django REST Framework:无法正确地将值注释到多对多模型,不断得到错误字段名称字段对模型无效'<><>

Python+线程\TrocessPoolExecutor

未知依赖项pin—1阻止conda安装""

删除marplotlib条形图上的底边

SQLAlchemy bindparam在mssql上失败(但在mysql上工作)

使用BeautifulSoup抓取所有链接

Pandas—堆栈多索引头,但不包括第一列

如何在FastAPI中替换Pydantic的constr,以便在BaseModel之外使用?'

在Python中控制列表中的数据步长

如何获得3D点的平移和旋转,给定的点已经旋转?

多个矩阵的张量积

如何从数据框列中提取特定部分并将该值填充到其他列中?