我不熟悉airflow,正在处理一个用例,需要读取传递给airflow dag的输入json配置,并根据读取的配置构造一个字符串,该字符串将用作我们在GCP data proc中创建的集群的名称.

前任:

{

我希望集群名称为"my cluster for data engg用例"

from datetime import datetime
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.utils.dates import days_ago

CONN_ID = 'blah'
PROJECT_ID = 'xyz'
REGION = 'us-east4'
CLUSTER_NAME = "my-cluster-for-"+dag_run.conf['x']+dag_run.conf['y']+dag_run.conf['z']
with models.DAG('simple-python-dag', start_date=days_ago(1), schedule_interval=None) as dag:


    create_cluster_spark = XyzCreateClusterOperator(task_id='create_cluster_spark',
                                                    cluster_name=CLUSTER_NAME,
                                                    location=REGION,
                                                    gcp_conn_id=CONN_ID)

    create_cluster_spark

推荐答案

dag_run仅在活动DAG运行中可用(即,当您触发DAG并运行时)-指定dag_run.conf作为对象将不起作用,因为气流调度器将解析您的DAG(默认情况下每30秒一次),并且不存在此类对象.

访问dag_run有两种方式:

  1. 使用模板系统
CLUSTER_NAME = "my-cluster-for-{{ dag_run.conf['x'] }}{{ dag_run.conf['y'] }}{{ dag_run.conf['z'] }}"

仅当cluster_name字段是XyzCreateClusterOperator运算符中的模板字段时,此操作才有效.您可以通过查看文档字符串或操作员的气流documentation来判断字段是否已模板化.

如果XyzCreateClusterOperator是第三方Provider ,您可以查看Provider here的文档

您可以在模板here中看到所有可用项(dag_run是使用此模板语法可以访问的许多内容之一).

  1. 您可以在python操作符中访问dag_run
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2022, 4, 18),
    catchup=False,
)
def example_dag():
    @task()
    def dag_run_example_task(**context):
        context["dag_run"].conf.get("x")
        context["dag_run"].conf.get("y")
        context["dag_run"].conf.get("z")

如果cluster_name不是模板化字段,则可以使用Python操作符包装操作符并替换为:

import pendulum
from airflow.decorators import dag, task

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2022, 4, 18),
    catchup=False,
)
def example_dag():
    @task()
    def dag_run_example_task(**context):
        cluster_name = "my-cluster-for-"+context["dag_run"].conf.get("x")+context["dag_run"].conf.get("y")+context["dag_run"].conf.get("z")

        create_cluster_spark = XyzCreateClusterOperator(
            task_id='create_cluster_spark',
            cluster_name=cluster_name,
            location=REGION,
            gcp_conn_id=CONN_ID,
        )

        create_cluster_spark.execute(context=context)

模板here中可用的相同对象在传递给PythonOperator的context对象中可用(还有一些其他对象).

Python相关问答推荐

无法获得指数曲线_fit来处理日期

计算每月过go x年的平均值

有什么方法可以修复奇怪的y轴Python matplotlib图吗?

Python:MultiIndex Dataframe到类似json的字典列表

每个组每第n行就有Pandas

使用Python C API重新启动Python解释器

Polars Select 多个元素产品

如何编写一个正规表达式来查找序列中具有2个或更多相同辅音的所有单词

使用Ubuntu、Python和Weasyprint的Docker文件-venv的问题

拆分pandas列并创建包含这些拆分值计数的新列

在应用循环中间保存pandas DataFrame

如何通过多2多字段过滤查询集

使用polars .滤镜进行切片速度比pandas .loc慢

Polars比较了两个预设-有没有方法在第一次不匹配时立即失败

Pandas 有条件轮班操作

django禁止直接分配到多对多集合的前端.使用user.set()

对象的`__call__`方法的setattr在Python中不起作用'

如何使用scipy的curve_fit与约束,其中拟合的曲线总是在观测值之下?

在两极中过滤

如何创建引用列表并分配值的Systemrame列