I deployed airflow on kubernetes using the official helm chart. I'm using KubernetesExecutor and git-sync.
I am using a seperate docker image for my webserver and my workers - each DAG gets its own docker image. I am running into DAG import errors at the airflow home page. E.g. if one of my DAGs is using pandas then I'll get

Broken DAG: [/opt/airflow/dags/repo/dags/airflow_demo/ieso.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/repo/dags/project1/dag1.py", line 7, in <module>
    from pandas import read_parquet
ModuleNotFoundError: No module named 'pandas'

I dont have pandas installed on the webserver or scheduler docker images, because if I understand it correctly you shouldn't install the individual dependencies on these. I am getting the same error when running airflow dags list-import-errors on the scheduler pod. I do have pandas installed on the worker pod, but it doesn't get run, because the DAG cannot be discovered through these errors.
How do I make airflow discover this DAG without installing pandas to either scheduler or webserver? I know installing it on both will fix this, however I am not interested in doing it this way.

推荐答案

它很好地精心设计了一个由OP的 comments 😜列出的答案

在 comments 中,@user430953人提供了this link份给Airflow的文档,其中写道:

影响DAG加载时间的一个重要因素可能被Python开发人员忽视,那就是顶级导入可能会花费大量时间,并且可能会产生大量开销,例如,通过将它们转换为Python可调用程序中的本地导入,可以很容易地避免这一点.

这是有意义的:调度器将监视DAG文件夹,并且它将try 通过执行它在文件夹中找到的.py个文件来加载类DAG的实例,就好像它们是常规的"可导入"的Python模块(erm...嗯...因为他们是),然后"钓鱼"实体(物体?实例?)从模块的变量中取出.更具体地说,这会发生在herehere之间(非常宽泛地说,只是如果你好奇的话)

与任何常规Python模块一样,Python解释器将在导入时"运行"代码,因此将try 任何顶级导入.如果模块导入了调度器的Python解释器没有的包,则会抛出ImportError.

既然行动说...

我确实在工人舱里安装了Pandas

...这意味着实际工作将是可行的.我们只需要避免调度程序运行此导入.

最快的方法是执行相对导入,因此只有在执行实际工作时才导入pandas模块(毫不奇怪,在worker台机器中...它们确实安装了Pandas )

因此,从以下方面来看:

import pandas as pd

with DAG(...) as dag:
    @task
    def some_pandas_work(**context):
        # ...

敬..

with DAG(...) as dag:
    @task
    def some_pandas_work(**context):
        import pandas as pd
        # ...

...应该能奏效

Python相关问答推荐

用gekko解决的ADE方程系统突然不再工作,错误消息异常:@错误:模型文件未找到.& &

LAB中的增强数组

如何调整spaCy token 化器,以便在德国模型中将数字拆分为行末端的点

Pandas 在最近的日期合并,考虑到破产

Python上的Instagram API:缺少client_id参数"

如何列举Pandigital Prime Set

我们可以为Flask模型中的id字段主键设置默认uuid吗

如何使用它?

当独立的网络调用不应该互相阻塞时,'

如果条件不满足,我如何获得掩码的第一个索引并获得None?

如何使regex代码只适用于空的目标单元格

以逻辑方式获取自己的pyproject.toml依赖项

为什么调用函数的值和次数不同,递归在代码中是如何工作的?

为什么我的sundaram筛这么低效

并行编程:同步进程

为什么t sns.barplot图例不显示所有值?'

使用SeleniumBase保存和加载Cookie时出现问题

read_csv分隔符正在创建无关的空列

将数字数组添加到Pandas DataFrame的单元格依赖于初始化

启动线程时,Python键盘模块冻结/不工作