为什么我不能覆盖一个文件使用下面的代码?

一些背景:我已经注意到,比方说我希望每次创建和追加一个. csv文件,虽然我给'a'(追加)作为写模式,不知何故,我可能会创建文件,但不追加到它.

files = dbutils.fs.ls("/mnt/lake/RAW/test/billion-row-ingestion-time/table/")
parquet_file_list = [each.path for each in files if each.name!='_delta_log/']

for each in parquet_file_list:
    i=0
    df = spark.read.parquet(each).toPandas()
    df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')
    print("interation: ", i+1)

输出

OSError: [Errno 95] Operation not supported
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:261, in CSVFormatter.save(self)
    251 self.writer = csvlib.writer(
    252     handles.handle,
    253     lineterminator=self.line_terminator,
   (...)
    258     quotechar=self.quotechar,
    259 )
--> 261 self._save()

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:266, in CSVFormatter._save(self)
    265     self._save_header()
--> 266 self._save_body()

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:304, in CSVFormatter._save_body(self)
    303     break
--> 304 self._save_chunk(start_i, end_i)

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:315, in CSVFormatter._save_chunk(self, start_i, end_i)
    314 ix = self.data_index[slicer]._format_native_types(**self._number_format)
--> 315 libwriters.write_csv_rows(
    316     data,
    317     ix,
    318     self.nlevels,
    319     self.cols,
    320     self.writer,
    321 )

File /databricks/python/lib/python3.10/site-packages/pandas/_libs/writers.pyx:55, in pandas._libs.writers.write_csv_rows()

OSError: [Errno 95] Operation not supported

During handling of the above exception, another exception occurred:

OSError                                   Traceback (most recent call last)
File <command-1964363491723333>, line 4
      2 i=0
      3 df = spark.read.parquet(each).toPandas()
----> 4 df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')
      5 print("interation: ", i+1)

File /databricks/python/lib/python3.10/site-packages/pandas/core/generic.py:3551, in NDFrame.to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, decimal, errors, storage_options)
   3540 df = self if isinstance(self, ABCDataFrame) else self.to_frame()
   3542 formatter = DataFrameFormatter(
   3543     frame=df,
   3544     header=header,
   (...)
   3548     decimal=decimal,
   3549 )
-> 3551 return DataFrameRenderer(formatter).to_csv(
   3552     path_or_buf,
   3553     line_terminator=line_terminator,
   3554     sep=sep,
   3555     encoding=encoding,
   3556     errors=errors,
   3557     compression=compression,
   3558     quoting=quoting,
   3559     columns=columns,
   3560     index_label=index_label,
   3561     mode=mode,
   3562     chunksize=chunksize,
   3563     quotechar=quotechar,
   3564     date_format=date_format,
   3565     doublequote=doublequote,
   3566     escapechar=escapechar,
   3567     storage_options=storage_options,
   3568 )

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/format.py:1180, in DataFrameRenderer.to_csv(self, path_or_buf, encoding, sep, columns, index_label, mode, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, errors, storage_options)
   1159     created_buffer = False
   1161 csv_formatter = CSVFormatter(
   1162     path_or_buf=path_or_buf,
   1163     line_terminator=line_terminator,
   (...)
   1178     formatter=self.fmt,
   1179 )
-> 1180 csv_formatter.save()
   1182 if created_buffer:
   1183     assert isinstance(path_or_buf, StringIO)

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:241, in CSVFormatter.save(self)
    237 """
    238 Create the writer & save.
    239 """
    240 # apply compression and byte/text conversion
--> 241 with get_handle(
    242     self.filepath_or_buffer,
    243     self.mode,
    244     encoding=self.encoding,
    245     errors=self.errors,
    246     compression=self.compression,
    247     storage_options=self.storage_options,
    248 ) as handles:
    249 
    250     # Note: self.encoding is irrelevant here
    251     self.writer = csvlib.writer(
    252         handles.handle,
    253         lineterminator=self.line_terminator,
   (...)
    258         quotechar=self.quotechar,
    259     )
    261     self._save()

File /databricks/python/lib/python3.10/site-packages/pandas/io/common.py:124, in IOHandles.__exit__(self, *args)
    123 def __exit__(self, *args: Any) -> None:
--> 124     self.close()

File /databricks/python/lib/python3.10/site-packages/pandas/io/common.py:116, in IOHandles.close(self)
    114     self.created_handles.remove(self.handle)
    115 for handle in self.created_handles:
--> 116     handle.close()
    117 self.created_handles = []
    118 self.is_wrapped = False

OSError: [Errno 95] Operation not supported

推荐答案

取代:

df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')

与:

df.to_csv('/tmp/on/your/disk/b.csv', mode='a')

然后在循环之后上传:

dbutils.fs.cp('/tmp/on/your/disk/b.csv',
              '/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv')

问题是您试图将数据附加到保存在外部存储装载点上的文件,而Dataricks不支持这一点(因为装载到DBFS的底层存储不支持附加).因此,解决方案是run the append locally, then upload the entire file to the Databricks file system一旦完成.

更多内容:

事实上,同样的问题:

Python相关问答推荐

LAB中的增强数组

Python 3.12中的通用[T]类方法隐式类型检索

Polars比较了两个预设-有没有方法在第一次不匹配时立即失败

Polars LazyFrame在收集后未返回指定的模式顺序

PywinAuto在Windows 11上引发了Memory错误,但在Windows 10上未引发

追溯(最近最后一次调用):文件C:\Users\Diplom/PycharmProject\Yolo01\Roboflow-4.py,第4行,在模块导入roboflow中

如何过滤包含2个指定子字符串的收件箱列名?

无法定位元素错误404

基于索引值的Pandas DataFrame条件填充

如何从pandas的rame类继承并使用filepath实例化

实现神经网络代码时的TypeError

如何合并两个列表,并获得每个索引值最高的列表名称?

在不同的帧B中判断帧A中的子字符串,每个帧的大小不同

为什么常规操作不以其就地对应操作为基础?

在Python中从嵌套的for循环中获取插值

比Pandas 更好的 Select

Python日志(log)模块如何在将消息发送到父日志(log)记录器之前向消息添加类实例变量

如何使用matplotlib查看并列直方图

如何使用大量常量优化代码?

多个矩阵的张量积