我的spark 码最近导致闸板泄漏.例如,在运行任何脚本之前,当我运行top时,我可以看到251 GB的总内存和230 GB的空闲+已用内存.
当我将Spark作业(job)运行到spark-submit
时,无论作业(job)是否完成(异常结束),空闲+已用内存都比开始时低得多.以下是我的代码示例之一:
from pyspark.sql import SparkSession
def read_df(spark, jdbc_url, table_name, jdbc_properties ):
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)
return df
def write_df(result, table_name, jdbc_properties):
result = result.repartition(50)
result.write.format('jdbc').options(
url=jdbc_properties['jdbc_url'],
driver="org.postgresql.Driver",
user=jdbc_properties["user"],
password=jdbc_properties["password"],
dbtable=table_name,
mode="overwrite"
).save()
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.driver.extraClassPath", "postgresql-42.5.2.jar").config("spark.executor.extraClassPath","postgresql-42.5.2.jar") \
.config("spark.local.dir", "/shared/hm31") \
.config("spark.master", "local[*]") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
parquet_path = '/shared/hossein_hm31/embeddings_parquets'
try:
unique_nodes = read_df(spark, jdbc_url, 'hm31.unique_nodes_cert', jdbc_properties)
df = spark.read.parquet(parquet_path)
unique_nodes.createOrReplaceTempView("unique_nodes")
df.createOrReplaceTempView("all_embeddings")
sql_query = """
select u.node_id, a.embedding from unique_nodes u inner join all_embeddings a on u.pmid = a.pmid
"""
result = spark.sql(sql_query)
print("num", result.count())
result.repartition(10).write.parquet('/shared/parquets_embeddings/')
write_df(result, 'hm31.uncleaned_embeddings_cert', jdbc_properties)
spark.catalog.clearCache()
unique_nodes.unpersist()
df.unpersist()
result.unpersist()
spark.stop()
exit(0)
except:
print('Error')
spark.catalog.clearCache()
unique_nodes.unpersist()
df.unpersist()
spark.stop()
exit(0)
print('Error')
spark.catalog.clearCache()
unique_nodes.unpersist()
df.unpersist()
spark.stop()
exit(0)
在那里我试图删除缓存的数据帧.这种RAM泄漏需要重新启动服务器,这很不舒服.
这是我运行的命令:
spark-submit --master local[50] --driver-class-path ./postgresql-42.5.2.jar --jars ./postgresql-42.5.2.jar --driver-memory 200g --conf "spark.local.dir=./logs" calculate_similarities.py
这是最高的输出,您可以看到空闲+已用内存比总和少得多,在我运行Spark工作之前,它过go 大约是230.这些作业(job)是按内存使用情况排序的,您可以看到,在Spark以异常结束后,没有运行内存密集型作业(job).
我要补充的是,这台机器本身没有spark .它有Java 11,我只是通过导入它的包来运行Pyspark.
谢谢
附注:在Postgres上,unique_nodes
大约是0.5 GB.df = spark. read.parquet(parquet_path)
可以读取38个镶木地板文件,每个文件约3 GB.加入后,result
的容量约为8 GB.