因此,我将DAG从1.12.15版升级到2.2.2版,并将python从3.8版降级到3.7版(因为MWAA不支持python 3.8).DAG在之前的设置中工作正常,但在MWAA设置中显示此错误:

Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'

这是一个似乎正在失败的内置功能:


def set_downstream(
        self,
        task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
        edge_modifier: Optional[EdgeModifier] = None,
    ) -> None:
        """
        Set a task or a task list to be directly downstream from the current
        task. Required by TaskMixin.
        """
        self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)

下面是我们试图在DAG中运行的代码:

    for report in reports:
        dag << PythonOperator(
            task_id=f"task_{report}",
            python_callable=process,
            op_kwargs={
                "conn": "snowflake_production",
                "table": report,
            },
            provide_context=True,
        )

我认为从Python3.8到3.7的转换导致了这个问题,但我不确定.

有没有人遇到过类似的问题?

推荐答案

对于气流>=2.0.0不再支持使用位移位(bit shift)运算符将任务分配给DAG.

努力做到:

dag = DAG("my_dag")
dummy = DummyOperator(task_id="dummy")

dag >> dummy

行不通.

只能在运算符之间设置依赖项.

您应该使用上下文管理器:

with DAG("my_dag") as dag:
    dummy = DummyOperator(task_id="dummy")

它已经处理了操作符与DAG对象的关系.

Python相关问答推荐

如何使用关键参数按列对Pandas rame进行排序

当测试字符串100%包含查询字符串时,为什么t fuzzywuzzy s Process.extractBests不给出100%分数?

sys.modulesgo 哪儿了?

Polars -转换为PL后无法计算熵.列表

使用图片生成PDF Django rest框架

从管道将Python应用程序部署到Azure Web应用程序,不包括需求包

使用LineConnection动画1D数据

Pandas 第二小值有条件

如何使用symy打印方程?

连接两个具有不同标题的收件箱

试图找到Python方法来部分填充numpy数组

什么相当于pytorch中的numpy累积ufunc

删除字符串中第一次出现单词后的所有内容

ThreadPoolExecutor和单个线程的超时

Pandas—在数据透视表中占总数的百分比

在Django admin中自动完成相关字段筛选

从Windows Python脚本在WSL上运行Linux应用程序

如何使用OpenGL使球体遵循Python中的八样路径?

统计numpy. ndarray中的项目列表出现次数的最快方法

使用SeleniumBase保存和加载Cookie时出现问题