我的spark 码最近导致闸板泄漏.例如,在运行任何脚本之前,当我运行top时,我可以看到251 GB的总内存和230 GB的空闲+已用内存.

当我将Spark作业(job)运行到spark-submit时,无论作业(job)是否完成(异常结束),空闲+已用内存都比开始时低得多.以下是我的代码示例之一:

    from pyspark.sql import SparkSession

    def read_df(spark, jdbc_url, table_name, jdbc_properties ):
        df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)
        return df
    
    
    def write_df(result, table_name, jdbc_properties):
        result = result.repartition(50)
        result.write.format('jdbc').options(
            url=jdbc_properties['jdbc_url'],
            driver="org.postgresql.Driver",
            user=jdbc_properties["user"],
            password=jdbc_properties["password"],
            dbtable=table_name,
            mode="overwrite"
        ).save()


    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .appName("Python Spark SQL basic example") \
            .config("spark.driver.extraClassPath", "postgresql-42.5.2.jar").config("spark.executor.extraClassPath","postgresql-42.5.2.jar") \
            .config("spark.local.dir", "/shared/hm31") \
            .config("spark.master", "local[*]") \
            .getOrCreate()
    
    
    
        spark.sparkContext.setLogLevel("WARN")
    
        parquet_path = '/shared/hossein_hm31/embeddings_parquets'
    
        try:
    
            unique_nodes = read_df(spark, jdbc_url, 'hm31.unique_nodes_cert', jdbc_properties)
            df = spark.read.parquet(parquet_path)
    
    
            unique_nodes.createOrReplaceTempView("unique_nodes")
            df.createOrReplaceTempView("all_embeddings")
    
            sql_query = """
                select u.node_id, a.embedding from unique_nodes u inner join all_embeddings a on u.pmid = a.pmid
            """
            result =  spark.sql(sql_query)
    
      
    
            print("num", result.count())
            result.repartition(10).write.parquet('/shared/parquets_embeddings/')
    
            write_df(result, 'hm31.uncleaned_embeddings_cert', jdbc_properties)
    
    
    
    
            spark.catalog.clearCache()
            unique_nodes.unpersist()
            df.unpersist()
            result.unpersist()
    
            spark.stop()
            exit(0)
    
    
    
    
        except:
            print('Error')
            spark.catalog.clearCache()
            unique_nodes.unpersist()
            df.unpersist()
            spark.stop()
            exit(0)
    
    
        print('Error')
        spark.catalog.clearCache()
        unique_nodes.unpersist()
        df.unpersist()
        spark.stop()
        exit(0)

在那里我试图删除缓存的数据帧.这种RAM泄漏需要重新启动服务器,这很不舒服.

这是我运行的命令:

spark-submit --master local[50] --driver-class-path ./postgresql-42.5.2.jar --jars ./postgresql-42.5.2.jar --driver-memory 200g --conf "spark.local.dir=./logs" calculate_similarities.py

这是最高的输出,您可以看到空闲+已用内存比总和少得多,在我运行Spark工作之前,它过go 大约是230.这些作业(job)是按内存使用情况排序的,您可以看到,在Spark以异常结束后,没有运行内存密集型作业(job).

我要补充的是,这台机器本身没有spark .它有Java 11,我只是通过导入它的包来运行Pyspark.

enter image description here

谢谢

附注:在Postgres上,unique_nodes大约是0.5 GB.df = spark. read.parquet(parquet_path)可以读取38个镶木地板文件,每个文件约3 GB.加入后,result的容量约为8 GB.

推荐答案

这里没有"RAM泄漏".你误解了top显示的内容:

  • total是内存总量(没什么好奇怪的)
  • free是未用于任何目的的内存量
  • used是内核当前分配的数量,例如,由于来自应用程序的请求
  • free+used的总和是not total,因为还有buff/cache.这是当前用于"次要"目的的内存量,特别是缓存磁盘上的数据时,内核知道它在内存中已经有一个完全相同的副本;只要不存在used的内存压力,内核就会try 保持它的buff/cache.
  • avail是应用程序可以随时使用的值,大约是free+buff/cache的总和

您的top屏幕截图显示了分配给buff/cache的大量内存,这些数据很可能是以前准备好的数据,内核会保留这些数据以备将来需要;这里没有"泄漏",因为如果应用程序的需要到达,内核将收回这些缓存的内存页.还请注意,avail的数字仍然在234 GB左右,这几乎与您对free+used的预期完全相同--但没有考虑buff/cache.

Python相关问答推荐

尽管进程输出错误消息,subProcess.check_call的CalledProcess错误.stderr为无

根据多列和一些条件创建新列

Snap 7- read_Area用于类似地址的变量

Python中使用Delivercio进行多个请求

Python:记录而不是在文件中写入询问在多文件项目中记录的最佳实践

为什么dict(id=1,**{id:2})有时会引发KeyMessage:id而不是TypMessage?

具有多个选项的计数_匹配

rame中不兼容的d类型

抓取rotowire MLB球员新闻并使用Python形成表格

用NumPy优化a[i] = a[i-1]*b[i] + c[i]的迭代计算

关于Python异步编程的问题和使用await/await def关键字

如何设置视频语言时上传到YouTube与Python API客户端

Pandas计数符合某些条件的特定列的数量

利用Selenium和Beautiful Soup实现Web抓取JavaScript表

无论输入分辨率如何,稳定扩散管道始终输出512 * 512张图像

polars:有效的方法来应用函数过滤列的字符串

无法在Spyder上的Pandas中将本地CSV转换为数据帧

如何将一组组合框重置回无 Select tkinter?

Python 3试图访问在线程调用中实例化的类的对象

如何合并具有相同元素的 torch 矩阵的行?