我试图以最有效的方式使用Spark读取目录中的多个CSV文件.不幸的是,除了单独读取每个文件之外,我还没有找到更好的方法,这非常耗时.

据我所知,读取多个CSV文件的最有效方法是使用*,如下所示:

df = spark.read.format('csv') \
        .option('header', 'true') \
        .load('/path/to/csv/folder/*.csv')

然而,尽管它非常快,但它不按列名执行联合,而是遵循列索引. 例如,如果该目录包含以下两个CSV文件:

1.csv:

A B C
1 2 5
3 4 6

2.csv:

A C
7 8

前面的操作将按如下方式合并它们:

df:

A B C
1 2 5
3 4 6
7 8 NULL

这显然是incorrect,因为最后一行应为7|NULL|8.

好吧,我能够解决这个问题,通过单独读取每个文件,然后在allowMissingColumns参数设置为True的情况下执行unionByName,如下所示:

dfs = []
for filename in list_file_names('/path/to/csv/folder'):
    dfs.append(spark.read.format('csv') \
        .option('header', 'true') \
        .load('/path/to/csv/folder/{filename}')
    )
union_df = dfs[0]
for df in dfs[1:]:
    union_df = union_df.unionByName(df, allowMissingColumns=True)

这与预期的一样工作,但当我单独读取每个文件时,速度要慢得多.对于同一台机器上的hdfs中的6 seconds个小CSV文件,第一种(但错误的)方法大约需要6 seconds个,而第二种方法需要16 seconds个.

So my question is, can I achieve the same result in PySpark by performing only one read operation as in the first method?

推荐答案

我可以通过只执行一次读操作在PySpark中获得相同的结果吗?

遗憾的是,正如您注意到的那样,由于模式合并的限制,您不能一次性使用Spark数据源API.

相反,您可以通过首先读取每个文件的头,将它们按csv类别分组,然后将每个类别的文件合并,优化联合方法.

在纯Pythonwith boto for example中可以获得与文件路径相关联的所有第一行.

然后一次读取一个文件列表可以用comma separated list of path完成.

而在两个步骤中,如果你只有很少的CSV类别,这应该比联合每一个文件要快得多.

Python相关问答推荐

rame中不兼容的d类型

什么相当于pytorch中的numpy累积ufunc

优化pytorch函数以消除for循环

两个pandas的平均值按元素的结果串接元素.为什么?

如何在WSL2中更新Python到最新版本(3.12.2)?

为什么抓取的HTML与浏览器判断的元素不同?

实现自定义QWidgets作为QTimeEdit的弹出窗口

海上重叠直方图

无法连接到Keycloat服务器

Python中的变量每次增加超过1

在www.example.com中使用`package_data`包含不包含__init__. py的非Python文件

如何在PySide/Qt QColumbnView中删除列

Python—为什么我的代码返回一个TypeError

Python日志(log)模块如何在将消息发送到父日志(log)记录器之前向消息添加类实例变量

用两个字符串构建回文

如果服务器设置为不侦听创建,则QWebSocket客户端不连接到QWebSocketServer;如果服务器稍后开始侦听,则不连接

FileNotFoundError:[WinError 2]系统找不到指定的文件:在os.listdir中查找扩展名

我怎样才能让深度测试在OpenGL中使用Python和PyGame呢?

用LAKEF划分实木地板AWS Wrangler

Pandas:新列,从列表中采样,基于列值