我的流程的输入是一个 struct 化数据集,其中一列是指向BLOB的GCS路径,类似于:
df = spark.createDataFrame(
[("gs://my/bucket/image.jpg",), ("gs://my/bucket/image2.jpg",)],
["image_path"]
)
我想从该列中的URI中读取(二进制)数据,并将该数据存储在新列中.类似于:
import pyspark.sql.functions as F
df.withColumn("image_blob", magic_read_function(F.col("image_path")))
有没有一种有效的方法来做到这一点?我基本上是想告诉Spark尽可能多地批量读取数据,但不会拖着或收集给一个工人……如果可能的话.
我考虑过的一些 Select 包括:
- 我可以使用
paths = [row.image_path for row in df.collect()]
获取路径列表,然后读取该列表.这将强制进行洗牌,并将所有路径集中到单个工作进程上,这是我想要避免的. - 我可以创建一个从路径读取的UDF,但随后我会为正在读取的每个路径获得网络开销,而不是将文件批处理到每个工作进程.
- 我可以从父存储桶中读取所有文件,并对URI列(在本例中为
image_path
)执行内部联接.然后,我仍然必须对内部联接进行混洗,并且我必须假设所有输入路径都来自同一个存储桶(这通常是这个过程的情况,但目前没有任何东西可以保证它.如果可能,我希望避免在流程中添加此限制)