太主观了.通常你不会使用Spark并行来执行应用程序功能.它主要用于采用一个或多个列值并生成一个或多个转换值的转换.
也就是说,你可以使用UDF来使用它,其中UDF将行/列作为输入,并执行转换以外的操作.比如说
import pyspark.sql.functions as F
@F.udf(returnType=IntegerType())
def update_config(**cols):
row = ','.join(cols)
# update the config in some DB or whatever...
return result_of_status_update
in0.alias('in').withColumn('result_of_status_update', update_config('in.*'))
问题是,这将是非常慢的(与驱动程序上的for循环相比,如问题所示),因为不是迭代驱动程序上的行列表并处理它,而是将其分发给worker然后执行操作的开销.
选项包括:
- 正如您在驱动程序上提到的"多处理线程池".问题可能是行太多,驱动程序可能无法将其全部保存在内存中.即
in0.collect()
导致OOM.专业,开销最少,如果行数不太大,速度会很快.
- 将工作委托给使用UDF的工作人员,但不要在UDF内部完成所有工作,而是使UDF异步,例如使用REST调用.如果您处理的行数非常多,则可能会比#1运行得更快.问题是更难进行错误处理,并且您必须使UDF实现中使用的任何库都可供工作人员使用,例如在本例中为
requests
.
import pyspark.sql.functions as F
@F.udf(returnType=IntegerType())
def trigger_config_update_async(**cols):
import requests
row = ','.join(cols)
response = requests.put('http://my-site.com/config', data=row)
return response.content
in0.alias('in').withColumn('result_of_rest_call', trigger_config_update_async('in.*'))