我试图以最有效的方式使用Spark读取目录中的多个CSV文件.不幸的是,除了单独读取每个文件之外,我还没有找到更好的方法,这非常耗时.
据我所知,读取多个CSV文件的最有效方法是使用*
,如下所示:
df = spark.read.format('csv') \
.option('header', 'true') \
.load('/path/to/csv/folder/*.csv')
然而,尽管它非常快,但它不按列名执行联合,而是遵循列索引. 例如,如果该目录包含以下两个CSV文件:
1.csv
:
A | B | C |
---|---|---|
1 | 2 | 5 |
3 | 4 | 6 |
2.csv
:
A | C |
---|---|
7 | 8 |
前面的操作将按如下方式合并它们:
df
:
A | B | C |
---|---|---|
1 | 2 | 5 |
3 | 4 | 6 |
7 | 8 | NULL |
这显然是incorrect,因为最后一行应为7|NULL|8
.
好吧,我能够解决这个问题,通过单独读取每个文件,然后在allowMissingColumns
参数设置为True
的情况下执行unionByName
,如下所示:
dfs = []
for filename in list_file_names('/path/to/csv/folder'):
dfs.append(spark.read.format('csv') \
.option('header', 'true') \
.load('/path/to/csv/folder/{filename}')
)
union_df = dfs[0]
for df in dfs[1:]:
union_df = union_df.unionByName(df, allowMissingColumns=True)
这与预期的一样工作,但当我单独读取每个文件时,速度要慢得多.对于同一台机器上的hdfs中的6 seconds个小CSV文件,第一种(但错误的)方法大约需要6 seconds个,而第二种方法需要16 seconds个.
So my question is, can I achieve the same result in PySpark by performing only one read operation as in the first method?个