我试图将txt文件写入到Cloud Composer DAG中的DAGS文件夹中.文件从来没有出现,我认为我的代码有问题,但我try 以xlsx格式保存Pandas 数据帧,执行DAGs文件夹并加载该数据帧.

事实证明,它奏效了.在相同的代码中,我能够编写Pandas 数据帧,然后在相同的DAG运行中读取它,但后来当我查看文件夹时,没有文件.如果我再次运行代码并try 读取它,它会显示该文件不存在.

就像文件只是暂时写入一样.

我还使用了文件夹的完整路径("/home/airflow/gcs/dgs"),因为我正在try 将文件保存到我的composer 所属的DAGS文件夹中,我想我不应该面临这么多麻烦.

有没有人对我如何解决这个问题有什么 idea ?

编辑:

代码片段:

def _crawl_spiders():
    # sets working dir
    os.chdir('/home/airflow/gcs/dags/mypath')


    df = pd.read_excel('./x-path/sheet.xlsx')

    df.to_excel('/home/airflow/gcs/dags/mypath/test.xlsx', index = False)
    b = pd.read_excel('/home/airflow/gcs/dags/mypath/test.xlsx')
    print(f'Success, b columns:{b.columns}')

with DAG(dag_id="crawler", start_date=datetime(2022,7,28),
    schedule_interval='@daily', tags=['muffet', 'crawler']) as dag:

    crawl_spiders = PythonOperator(
    task_id = 'crawl_spiders',
    python_callable  = _crawl_spiders,
    dag = dag)```

推荐答案

您可能会将*.xlsx文件保存到/主页/气流/GCS/数据目录而不是/主页/气流/GCS/DAGS目录.

(或者将数据帧保存到本地*.xlsx并使用Google Cloud Storage API客户端库将其上载到GCS路径.)

文档表明,Cloud Composer环境被设置为将/主页/气流/GCS/DAGS个内容与其对应的GCS路径"单向"同步,即仅从GCS到本地目录,而不是在另一个方向:"Unidirectional synching means that local changes in these folders are overwritten."这就是保存的*.xlsx文件暂时可用但稍后消失的原因.

以下是这些文件的相关摘录.


Data Stored in Cloud Storage页的Folders in the Cloud Storage Bucket部分:

Cloud Composer将工作流(DAG)的源代码及其依赖项存储在云存储中的特定文件夹中,并使用Cloud Storage Fuse将这些文件夹映射到Cloud Composer环境中的Airflow实例.

映射到GCS路径的本地目录如下所示.具体地,/data目录"stores the data that tasks produce and use. This folder is mounted on all worker nodes."

  • /主页/气流/GCS/DAGS
  • /主页/气流/GCS/插件
  • /主页/气流/GCS/数据
  • /主页/气流/GCS/日志(log)

从同一页的Data Synchronization部分:

Cloud Composer通过本地复制单向同步dags/plugins/文件夹.单向同步意味着这些文件夹中的本地更改将被覆盖.

data/logs/文件夹使用云存储保险丝进行双向同步.

Python相关问答推荐

发生异常:TclMessage命令名称无效.!listbox"

django禁止直接分配到多对多集合的前端.使用user.set()

不允许访问非IPM文件夹

在matplotlib中删除子图之间的间隙_mosaic

如果初始groupby找不到满足掩码条件的第一行,我如何更改groupby列,以找到它?

如何创建引用列表并分配值的Systemrame列

如何找出Pandas 图中的连续空值(NaN)?

从源代码显示不同的输出(机器学习)(Python)

在用于Python的Bokeh包中设置按钮的样式

如何过滤组s最大和最小行使用`transform`'

为用户输入的整数查找根/幂整数对的Python练习

Js的查询结果可以在PC Chrome上显示,但不能在Android Chrome、OPERA和EDGE上显示,而两者都可以在Firefox上运行

如何将泛型类类型与函数返回类型结合使用?

上传文件并使用Panda打开时的Flask 问题

多索引数据帧到标准索引DF

try 在单个WITH_COLUMNS_SEQ操作中链接表达式时,使用Polars数据帧时出现ComputeError

有没有一种方法可以根据不同索引集的数组从2D数组的对称子矩阵高效地构造3D数组?

按最大属性值Django对对象进行排序

将标量值作为输入并输出矩阵的函数的积分

极点在没有Groupby的情况下聚合