我让一个用例迭代数据帧中的每一行,并将其传递给其他数据帧.下面是相同的代码片段.我希望并行执行此迭代.我不能使用rdd.map(),因为我们不能在Spark Worker中执行Spark驱动程序代码.获得并行性的任何替代方法

for row in in0.collect():
 update_config = self.config.update_from_row_map(row, conf_to_column)
 _inputs = inDFs
 results.append(self.__run__(spark, update_config, *_inputs))

如何使用Spark的并行性实现从in0到inDF的行的并行赋值.为了实现这一点,使用多处理线程池是一种好做法吗

推荐答案

太主观了.通常你不会使用Spark并行来执行应用程序功能.它主要用于采用一个或多个列值并生成一个或多个转换值的转换.

也就是说,你可以使用UDF来使用它,其中UDF将行/列作为输入,并执行转换以外的操作.比如说

import pyspark.sql.functions as F

@F.udf(returnType=IntegerType())
def update_config(**cols):
  row = ','.join(cols)
  # update the config in some DB or whatever...
  return result_of_status_update


in0.alias('in').withColumn('result_of_status_update', update_config('in.*'))

问题是,这将是非常慢的(与驱动程序上的for循环相比,如问题所示),因为不是迭代驱动程序上的行列表并处理它,而是将其分发给worker然后执行操作的开销.


选项包括:

  1. 正如您在驱动程序上提到的"多处理线程池".问题可能是行太多,驱动程序可能无法将其全部保存在内存中.即in0.collect()导致OOM.专业,开销最少,如果行数不太大,速度会很快.
  2. 将工作委托给使用UDF的工作人员,但不要在UDF内部完成所有工作,而是使UDF异步,例如使用REST调用.如果您处理的行数非常多,则可能会比#1运行得更快.问题是更难进行错误处理,并且您必须使UDF实现中使用的任何库都可供工作人员使用,例如在本例中为requests.
import pyspark.sql.functions as F

@F.udf(returnType=IntegerType())
def trigger_config_update_async(**cols):
  import requests
  row = ','.join(cols)
  response = requests.put('http://my-site.com/config', data=row)
  return response.content

in0.alias('in').withColumn('result_of_rest_call', trigger_config_update_async('in.*'))

Python相关问答推荐

如何修复fpdf中的线路出血

使用decorator 重复超载

在后台运行的Python函数

如何在Power Query中按名称和时间总和进行分组

如何处理必须存在于环境中但无法安装的Python项目依赖项?

我必须将Sigmoid函数与r2值的两种类型的数据集(每种6个数据集)进行匹配,然后绘制匹配函数的求导.我会犯错

Pandas 填充条件是另一列

Pandas实际上如何对基于自定义的索引(integer和非integer)执行索引

如何找到满足各组口罩条件的第一行?

从dict的列中分钟

如何列举Pandigital Prime Set

Python—从np.array中 Select 复杂的列子集

如何使Matplotlib标题以图形为中心,而图例框则以图形为中心

mypy无法推断类型参数.List和Iterable的区别

使用特定值作为引用替换数据框行上的值

在Python中使用if else或使用regex将二进制数据如111转换为001""

如何使regex代码只适用于空的目标单元格

网格基于1.Y轴与2.x轴显示在matplotlib中

在Python中计算连续天数

BeautifulSoup:超过24个字符(从a到z)的迭代失败:降低了首次深入了解数据集的复杂性: