在将我的数据湖装载到Databricks之后,我try 使用*.json个后缀将所有JSON文件加载到一个数据帧中,但它不起作用:

df = spark.read.option("recursiveFileLookup", "true") \
    .json("/mnt/adls_gen/prod/**/*.json")

执行上述代码时出现以下错误

[PATH_NOT_FOUND] Path does not exist: dbfs:/mnt/adls_gen/prod/**/*.json.

如果删除文件扩展名,操作将成功:

df = spark.read.option("recursiveFileLookup", "true") \
    .json("/mnt/adls_gen/prod/**/*")

...但它也在读取其他文件,例如扩展名为*.json_old*.txt的文件.

我不熟悉在此场景中使用的任何替代选项.是否有其他方法可用于按文件扩展名进行筛选?我在数据湖中的文件有各种扩展名,所以我正在寻找一种适应这种多样性的解决方案.

ApacheSpark版本是3.4.1(Scala 2.12).

推荐答案

我认为这就是在其他通用文件源选项中使用Path Glob Filter的目的:

Path Glob Filter
pathGlobFilter is used to only include files with file names matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery.

df_filtered = spark.read
  .option("header","true")
  .option("recursiveFileLookup","true")
  .csv("s3a://mybucket/testdata/csvs", pathGlobFilter="*.csv")

from pyspark.sql.functions import *
df_filtered.select(input_file_name()).distinct().show(truncate=False)
+-------------------------------------+
|input_file_name()                    |
+-------------------------------------+
|s3a://mybucket/testdata/csvs/c000.csv|
|s3a://mybucket/testdata/csvs/c001.csv|
|         :         :          :      |
+-------------------------------------+

Python相关问答推荐

当多个值具有相同模式时返回空

Pandas 滚动最接近的价值

通过Selenium从页面获取所有H2元素

如何请求使用Python将文件下载到带有登录名的门户网站?

如何在solve()之后获得症状上的等式的值

如何在WSL2中更新Python到最新版本(3.12.2)?

如何禁用FastAPI应用程序的Swagger UI autodoc中的application/json?

如何从列表框中 Select 而不出错?

* 动态地 * 修饰Python中的递归函数

未调用自定义JSON编码器

为什么常规操作不以其就地对应操作为基础?

使用Openpyxl从Excel中的折线图更改图表样式

如何从pandas DataFrame中获取. groupby()和. agg()之后的子列?

Polars map_使用多处理对UDF进行批处理

无法在Spyder上的Pandas中将本地CSV转换为数据帧

如何在FastAPI中替换Pydantic的constr,以便在BaseModel之外使用?'

对于标准的原始类型注释,从键入`和`从www.example.com `?

从列表中分离数据的最佳方式

如何在表单中添加管理员风格的输入(PDF)

普洛特利express 发布的人口普查数据失败