我不熟悉airflow,正在处理一个用例,需要读取传递给airflow dag的输入json配置,并根据读取的配置构造一个字符串,该字符串将用作我们在GCP data proc中创建的集群的名称.
前任:
{
我希望集群名称为"my cluster for data engg用例"
from datetime import datetime
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.utils.dates import days_ago
CONN_ID = 'blah'
PROJECT_ID = 'xyz'
REGION = 'us-east4'
CLUSTER_NAME = "my-cluster-for-"+dag_run.conf['x']+dag_run.conf['y']+dag_run.conf['z']
with models.DAG('simple-python-dag', start_date=days_ago(1), schedule_interval=None) as dag:
create_cluster_spark = XyzCreateClusterOperator(task_id='create_cluster_spark',
cluster_name=CLUSTER_NAME,
location=REGION,
gcp_conn_id=CONN_ID)
create_cluster_spark