I want to generate dynamic tasks from the dynamic task output. Each mapped task returns a list, and I'd like to create a separate mapped task for each of the element of the list so the process will look like this: Airflow dynamic task tree Is it possible to expand on the output of the dynamically mapped task so it will result in a sequence of map operations instead of a map and then reduce?

What I tried:

在我的本地环境中,我使用:

Astronomer Runtime 9.6.0 based on Airflow 2.7.3+astro.2
Git Version: .release:9fad9363bb0e7520a991b5efe2c192bb3405b675

为了进行实验,我使用了三个任务,其中一个字符串作为输入,一个字符串列表作为输出.

1.在具有展开任务的组上展开(在具有映射任务的组上映射):

import datetime
import logging

from airflow.decorators import dag, task, task_group

@dag(schedule_interval=None, start_date=datetime.datetime(2023, 9, 27))
def try_dag3():

    @task
    def first() -> list[str]:
        return ["0", "1"]

    first_task = first()

    @task_group
    def my_group(input: str) -> list[str]:
    
        @task
        def second(input: str) -> list[str]:
            logging.info(f"input: {input}")
            result = []
            for i in range(3):
                result.append(f"{input}_{i}")

            # ['0_0', '0_1', '0_2']
            # ['1_0', '1_1', '1_2']
            return result

        second_task = second.expand(input=first_task)

        @task
        def third(input: str, input1: str = None):
            logging.info(f"input: {input}, input1: {input1}")
            return input

        third_task = third.expand(input=second_task)
        
    my_group.expand(input=first_task)

try_dag3()

但它会导致NotImplementedError: operator expansion in an expanded task group is not yet supported

2.展开展开的任务结果(映射到映射的任务):

import datetime
import logging

from airflow.decorators import dag, task

@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag1():

    @task
    def first() -> list[str]:
        return ["0", "1"]

    first_task = first()

    @task
    def second(input: str) -> list[str]:
        logging.info(f"source: {input}")
        result = []
        for i in range(3):
            result.append(f"{input}_{i}")

        # ['0_0', '0_1', '0_2']
        # ['1_0', '1_1', '1_2']
        return result

    # this expands fine into two tasks from the list returned by first_task
    second_task = second.expand(input=first_task)

    @task
    def third(input: str):
        logging.info(f"source: {input}")
        return input

    # this doesn't expand - there are two mapped tasks, and the input value is a list, not a string
    third_task = third.expand(input=second_task)


try_dag1()

but the result of second dag is not expanded, and third task input is a string list instead: dag1 graph third[0] task log: [2024-01-05, 11:40:30 UTC] {try_dag1.py:30} INFO - source: ['0_0', '0_1', '0_2']

3.使用常量输入展开展开的任务(以测试 struct 是否可行):

import datetime
import logging

from airflow.decorators import dag, task

@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag0():

    @task
    def first() -> list[str]:
        return ["0", "1"]

    first_task = first()

    @task
    def second(input: str) -> list[str]:
        logging.info(f"input: {input}")
        result = []
        for i in range(3):
            result.append(f"{input}_{i}")

        # ['0_0', '0_1', '0_2']
        # ['1_0', '1_1', '1_2']
        return result

    second_task = second.expand(input=first_task)

    @task
    def third(input: str, input1: str = None):
        logging.info(f"input: {input}, input1: {input1}")
        return input

    third_task = third.expand(input=second_task, input1=["a", "b", "c"])


try_dag0()

It looks like the mapped tasks can be expanded over a constant list passed to input1, but input value is a nonexpanded list: dag0 graph third[0] task log: [2024-01-05, 12:51:39 UTC] {try_dag0.py:33} INFO - input: ['0_0', '0_1', '0_2'], input1: a

推荐答案

您需要添加一个任务来收集并展平second的结果.

@task
def first() -> list[str]:
    return ['1', '2']

@task
def second(input: str) -> list[str]:
    return [f"{input}_{i}" for i in ['1', '2', '3']]

@task
def second_collect(input: list[list[str]]) -> list[str]:
    return list(chain.from_iterable(input))

@task
def third(input: str) -> str:
    return f"Result: {input}!"

sc = second_collect(second.expand(input=first()))
third.expand(input=sc)

enter image description here

second_collect的结果是['1_1', '1_2', '1_3', '2_1', '2_2', '2_3'](映射任务的展平结果).

third个映射任务的结果为:

  • Result: 1_1!
  • Result: 1_2!
  • ...

Python相关问答推荐

过滤绕轴旋转的螺旋桨

在for循环中仅执行一次此操作

使用from_pandas将GeDataFrame转换为polars失败,ArrowType错误:未传递numpy. dype对象

如何通过多2多字段过滤查询集

Pandas 填充条件是另一列

运行回文查找器代码时发生错误:[类型错误:builtin_index_or_system对象不可订阅]

如何在Windows上用Python提取名称中带有逗号的文件?

DataFrames与NaN的条件乘法

计算每个IP的平均值

如何在UserSerializer中添加显式字段?

如果满足某些条件,则用另一个数据帧列中的值填充空数据帧或数组

如何在FastAPI中为我上传的json文件提供索引ID?

使用特定值作为引用替换数据框行上的值

如何杀死一个进程,我的Python可执行文件以sudo启动?

如何检测鼠标/键盘的空闲时间,而不是其他输入设备?

pysnmp—lextudio使用next()和getCmd()生成器导致TypeError:tuple对象不是迭代器''

如何按row_id/row_number过滤数据帧

在用于Python的Bokeh包中设置按钮的样式

从一个df列提取单词,分配给另一个列

如何在SQLAlchemy + Alembic中定义一个"Index()",在基表中的列上