I want to generate dynamic tasks from the dynamic task output. Each mapped task returns a list, and I'd like to create a separate mapped task for each of the element of the list so the process will look like this: Is it possible to expand on the output of the dynamically mapped task so it will result in a sequence of map operations instead of a map and then reduce?
What I tried:
在我的本地环境中,我使用:
Astronomer Runtime 9.6.0 based on Airflow 2.7.3+astro.2
Git Version: .release:9fad9363bb0e7520a991b5efe2c192bb3405b675
为了进行实验,我使用了三个任务,其中一个字符串作为输入,一个字符串列表作为输出.
1.在具有展开任务的组上展开(在具有映射任务的组上映射):
import datetime
import logging
from airflow.decorators import dag, task, task_group
@dag(schedule_interval=None, start_date=datetime.datetime(2023, 9, 27))
def try_dag3():
@task
def first() -> list[str]:
return ["0", "1"]
first_task = first()
@task_group
def my_group(input: str) -> list[str]:
@task
def second(input: str) -> list[str]:
logging.info(f"input: {input}")
result = []
for i in range(3):
result.append(f"{input}_{i}")
# ['0_0', '0_1', '0_2']
# ['1_0', '1_1', '1_2']
return result
second_task = second.expand(input=first_task)
@task
def third(input: str, input1: str = None):
logging.info(f"input: {input}, input1: {input1}")
return input
third_task = third.expand(input=second_task)
my_group.expand(input=first_task)
try_dag3()
但它会导致NotImplementedError: operator expansion in an expanded task group is not yet supported
2.展开展开的任务结果(映射到映射的任务):
import datetime
import logging
from airflow.decorators import dag, task
@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag1():
@task
def first() -> list[str]:
return ["0", "1"]
first_task = first()
@task
def second(input: str) -> list[str]:
logging.info(f"source: {input}")
result = []
for i in range(3):
result.append(f"{input}_{i}")
# ['0_0', '0_1', '0_2']
# ['1_0', '1_1', '1_2']
return result
# this expands fine into two tasks from the list returned by first_task
second_task = second.expand(input=first_task)
@task
def third(input: str):
logging.info(f"source: {input}")
return input
# this doesn't expand - there are two mapped tasks, and the input value is a list, not a string
third_task = third.expand(input=second_task)
try_dag1()
but the result of second
dag is not expanded, and third
task input is a string list instead:
third[0]
task log:
[2024-01-05, 11:40:30 UTC] {try_dag1.py:30} INFO - source: ['0_0', '0_1', '0_2']
3.使用常量输入展开展开的任务(以测试 struct 是否可行):
import datetime
import logging
from airflow.decorators import dag, task
@dag(start_date=datetime.datetime(2023, 9, 27))
def try_dag0():
@task
def first() -> list[str]:
return ["0", "1"]
first_task = first()
@task
def second(input: str) -> list[str]:
logging.info(f"input: {input}")
result = []
for i in range(3):
result.append(f"{input}_{i}")
# ['0_0', '0_1', '0_2']
# ['1_0', '1_1', '1_2']
return result
second_task = second.expand(input=first_task)
@task
def third(input: str, input1: str = None):
logging.info(f"input: {input}, input1: {input1}")
return input
third_task = third.expand(input=second_task, input1=["a", "b", "c"])
try_dag0()
It looks like the mapped tasks can be expanded over a constant list passed to input1
, but input
value is a nonexpanded list:
third[0]
task log:
[2024-01-05, 12:51:39 UTC] {try_dag0.py:33} INFO - input: ['0_0', '0_1', '0_2'], input1: a