从DAG,我可以使用op_argsop_kwargs将变量值作为参数传递给python脚本.在我的应用程序中,Dag正在导入我的脚本,我的python操作符如下所示

PythonOperator(
    task_id='xxxxxx',
    python_callable=my_script.main,
    op_args=[bucket_name, prefix, source_blob_name, dest_bucket_name],
    dag=dag,
    trigger_rule='all_success'
)

我在气流中声明了我的变量.我可以在这里调用我的值bucket_name=Variable.get('bucket_name'),我想在Python脚本中将bucket_name的值传递给我的变量,可以吗?

推荐答案

在Python操作符op_args中,op_kwargstemplates_dicttemplated fields.

所以你可以做:

PythonOperator(
    ...,
    op_args=['{{ var.value.bucket_name }}'],
    python_callable=my_script.main
)

那么您的Python可调用函数将是:

def main(*op_args):
    bucket_name = op_args[0]

您也可以使用op_kwargs/templates_dict:

PythonOperator(
    ...,
    templates_dict={'bucket_name', '{{ var.value.bucket_name }}'},
    python_callable=my_script.main
)

那么您的Python可调用函数将是:

def main(bucket_name, **context):
    ...

但这两种方法都没有必要.

你可以这样做:

from airflow.models.variable import Variable
def main(**context):
    bucket_name = Variable.get('bucket_name')

这是完全安全的,因为只有在执行PythonOperator时才调用main.

Python相关问答推荐

获取2个字节之间的异或

为什么图像结果翻转了90度?

在Python中,如何初始化集合列表脚本的输出

Python-Polars:如何用两个值的平均值填充NA?

如何对行使用分段/部分.diff()或.pct_change()?

两极:如何分割一个大 pyramid 并并行保存每个

Polars Select 多个元素产品

Matplotlib轮廓线值似乎不对劲

由于瓶颈,Python代码执行太慢-寻求性能优化

三个给定的坐标可以是矩形的点吗

如何计算两极打印机中 * 所有列 * 的出现次数?

什么相当于pytorch中的numpy累积ufunc

PMMLPipeline._ fit()需要2到3个位置参数,但给出了4个位置参数

在线条上绘制表面

NumPy中条件嵌套for循环的向量化

Python列表不会在条件while循环中正确随机化'

无法连接到Keycloat服务器

计算天数

如果初始groupby找不到满足掩码条件的第一行,我如何更改groupby列,以找到它?

未调用自定义JSON编码器