我的流程的输入是一个 struct 化数据集,其中一列是指向BLOB的GCS路径,类似于:

df = spark.createDataFrame(
    [("gs://my/bucket/image.jpg",), ("gs://my/bucket/image2.jpg",)], 
    ["image_path"]
)

我想从该列中的URI中读取(二进制)数据,并将该数据存储在新列中.类似于:

import pyspark.sql.functions as F

df.withColumn("image_blob", magic_read_function(F.col("image_path")))

有没有一种有效的方法来做到这一点?我基本上是想告诉Spark尽可能多地批量读取数据,但不会拖着或收集给一个工人……如果可能的话.

我考虑过的一些 Select 包括:

  • 我可以使用paths = [row.image_path for row in df.collect()]获取路径列表,然后读取该列表.这将强制进行洗牌,并将所有路径集中到单个工作进程上,这是我想要避免的.
  • 我可以创建一个从路径读取的UDF,但随后我会为正在读取的每个路径获得网络开销,而不是将文件批处理到每个工作进程.
  • 我可以从父存储桶中读取所有文件,并对URI列(在本例中为image_path)执行内部联接.然后,我仍然必须对内部联接进行混洗,并且我必须假设所有输入路径都来自同一个存储桶(这通常是这个过程的情况,但目前没有任何东西可以保证它.如果可能,我希望避免在流程中添加此限制)

推荐答案

目前,我只使用RDD API中的mapPartitions,并使用GCS python客户端手动执行批处理读取.这是我想出来的:

from pyspark.sql import Row
from google.cloud import storage
from urllib.parse import urlparse
import itertools

            
def _get_bucket(row):
    return urlparse(row.image_path).netloc

def _get_path_beneath_bucket(row):
    return urlparse(row.image_path).path.strip("/")
            
def process_partition(rows):
    client = storage.Client()
    
    # group by bucket, so we can efficiently read one bucket at a time
    bucket_blobs = itertools.groupby(rows, key=_get_bucket)
    
    for bucket_name, rows in bucket_blobs:
        bucket = client.get_bucket(bucket_name)
        # eval iterator, since we'll need it twice
        rows = list(rows)
        
        with client.batch():
            blobs = [bucket.blob(_get_path_beneath_bucket(row)) for row in rows]
            for blob in blobs:
                blob.reload()
        
        for row, blob in zip(rows, blobs):
            yield Row(image_path=row.image_path, image_blob=blob.download_as_bytes())

然后,我使用如下函数:

rdd = df.rdd.mapPartitions(process_partition)
result_df = rdd.toDF()

这不是很理想,部分原因是我不得不离开钨运行时来访问RDD API(我猜这将以一些性能代价结束),部分原因是它有点混乱.

如果你知道更好的方法,我很乐意接受不同的解决方案.:)

Python相关问答推荐

如何判断LazyFrame是否为空?

除了Python之外,可以替代bare?

在Python中使用一行try

了解shuffle在NP.random.Generator.choice()中的作用

遵循轮廓中对象方向的计算线

如何处理嵌套的SON?

在Python和matlab中显示不同 colored颜色 的图像

大Pandas 胚胎中产生组合

如何使用html从excel中提取条件格式规则列表?

不理解Value错误:在Python中使用迭代对象设置时必须具有相等的len键和值

更改键盘按钮进入'

我们可以为Flask模型中的id字段主键设置默认uuid吗

如何使用它?

无法使用DBFS File API路径附加到CSV In Datricks(OSError Errno 95操作不支持)

如何启动下载并在不击中磁盘的情况下呈现响应?

如何禁用FastAPI应用程序的Swagger UI autodoc中的application/json?

如何创建引用列表并分配值的Systemrame列

以异步方式填充Pandas 数据帧

将链中的矩阵乘法应用于多组值

如何根据rame中的列值分别分组值