因此,我将DAG从1.12.15版升级到2.2.2版,并将python从3.8版降级到3.7版(因为MWAA不支持python 3.8).DAG在之前的设置中工作正常,但在MWAA设置中显示此错误:
Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
这是一个似乎正在失败的内置功能:
def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Set a task or a task list to be directly downstream from the current
task. Required by TaskMixin.
"""
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
下面是我们试图在DAG中运行的代码:
for report in reports:
dag << PythonOperator(
task_id=f"task_{report}",
python_callable=process,
op_kwargs={
"conn": "snowflake_production",
"table": report,
},
provide_context=True,
)
我认为从Python3.8到3.7的转换导致了这个问题,但我不确定.
有没有人遇到过类似的问题?