我有一个气流DAG,它有两个任务,我只想在满足特定条件的情况下运行它们.如果不满足这些条件,我希望跳过它们.
DAG如下所示:
@dag(
dag_id="id_123",
schedule=DAG_SCHEDULE,
start_date=days_ago(0),
catchup=False,
default_args={
"retries": 0,
},
)
def dag_runner():
@task(task_id="get_data_src_a")
def get_data_src_a() -> list:
# return data from src_a
@task(task_id="get_data_src_b")
def get_data_src_b() -> list:
# return data from src_b
@task(task_id="find_uniq_users")
def find_uniq_users(users_from_a, users_from_b) -> list:
# return users in src_a but not in src_b
@task(task_id="do_something_with_users")
def do_something_with_users(uniq_users):
# do something with unique users
users_from_a = get_data_src_a()
users_from_b = get_data_src_b()
uniq_users = find_uniq_users(users_from_a,users_from_b)
do_something_with_users(uniq_users)
上面的代码可以工作,但我想对其进行改进.如果users_from_b
是一个空列表,我不想让find_uniq_users
运行.
如果uniq_users
是空的,我也不希望do_something_with_users
运行.
我try 了以下方法,但似乎不起作用:
if users_from_b:
uniq_users = find_uniq_users(users_from_a, users_from_b)
else:
uniq_users = users_from_a
if uniq_users:
do_something_with_users(uniq_users)
有谁能告诉我我应该做什么吗?