我试图在同一Pandas DataFrame上并行执行同一任务但使用不同的调用参数的代码上使用Python的多处理库(pool_starmap)来获得执行时间.

当我在数据帧的一小部分上执行此代码并执行10个作业(job)时,一切都工作得很好.然而,当我将整个100 000 000行数据集放入63个作业(job)(为此使用具有64个中央处理器核心的集群计算机)时,代码只是.冻结.它正在运行,但没有做任何事情(我知道它,因为每10 000个任务,代码应该打印一次它是活动的).

我在互联网上搜索并发现了类似的问题,但没有一个答案适用于我的具体 case ,所以我在这里.

Minimal Example

我做了一个最小的、self 维持的例子来重现这个问题. 假设为了简化,我的数据帧有2 columns;第一个是"stores",另一个是"price".我想恢复每家store 的平均价格. 当然,在这个特定问题中,我们只需将stores的rame分组并在price上进行聚合,但这是一种简化;让我们假设该任务只能在one store at a time中完成(这是我的 case ).以下是一个最小的例子:

#minimal example
#changes according to SIGHUP and Frank Yellin

import time
import pandas as pd
import random as rd
import multiprocessing as mp

import psutil #RAM usage

def create_datafile(nrows):
    """
    create a random pandas dataframe file
    To visualize this rather simple example,
    let's say that we are looking at a pool of 0.1*nrows products across different stores,
    that can have different values of the attribute "price"
    (in the list "stores").
    """
    
    price = [rd.randint(0,300) for i in range(nrows)]
    stores = [i%(0.1*nrows) for i in range(nrows)]

    data=zip(stores,price)
 
    return pd.DataFrame(data=data, columns=["stores", "price"])

            
def task(store):
    global data
    """
    the task we want to accomplish: compute mean price
    for each store in the dataframe.
    """

    if rd.randint(1,10000)==1:
        print('I am alive!')

    product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))

    selected_store = product_df[product_df['stores'] == store] #select the mean for a given store

    return (store, selected_store['mean_price'])

def pinit(_df):
    global data 
    data = _df

def get_ram_usage_pct(): 
    #source: https://www.pragmaticlinux.com/2020/12/monitor-cpu-and-ram-usage-in-python-with-psutil/
    """
    Obtains the system's current RAM usage.
    :returns: System RAM usage as a percentage.
    :rtype: float
    """
    return psutil.virtual_memory().percent


if __name__ == "__main__":
    ##
    nrows=100000000
    nb_jobs= 63

    print('Creating data...')
    df = create_datafile(nrows)
    print('Data created.')

    print('RAM usage after data creation is {} %'.format(get_ram_usage_pct()))


    stores_list = [i%(0.1*nrows) for i in range(nrows)]

    dic_mean={}
    #launch multiprocessing tasks with starmap
    tic=time.time()
    print(f'Max number of jobs: {mp.cpu_count() - 1}')
    print(f'Running: {min(nb_jobs, mp.cpu_count() - 1)} jobs...')
    with mp.Pool(initializer=pinit, initargs=(df,), processes=min(nb_jobs, mp.cpu_count() - 1)) as pool:
        for store,result in pool.map_async(task, stores_list).get():
            dic_mean[store] = result[store]
    toc=time.time()
    print(f'Processed data in {round((toc-tic)/60,1)} minutes (rounded to 0.1).') 

    #print(dic_mean)
    #dic_means now contains all the means computed by each program.

我使用的是Python 3.9.2.

如果您使用以下方式启动此代码:

  • nrows = 10000nb_jobs = 10,你应该不会遇到任何问题.
  • 然而,对于nrows=100000000nb_jobs=63,我提到的问题应该发生.

我对Python多处理很陌生,所以欢迎任何提示!提前感谢.

推荐答案

好吧,感谢@SIGHUP和@Frank Yellin,我能够找到这个问题,所以如果有人遇到类似问题,我会在这里分享.

Python seems unable to 100 anything when there are too many concurrent processes running.

例如,判断您的程序是否有效的解决方案是将其写入. text文件.一旦流程过多,100 statements will NOT appear in the Python console.

我不知道这print个声明是否会冻结整个程序,或者是否会继续运行.然而,我建议删除任何印刷声明,以避免任何意外.

以下是一种让我的示例中的代码轻松工作的方法(小心,long0 000行或更多will take a 100 time行):

#minimal example
#changes according to SIGHUP and Frank Yellin

import time
import pandas as pd
import random as rd
import multiprocessing as mp

import psutil #RAM usage

import sys

def create_datafile(nrows):
    """
    create a random pandas dataframe file
    To visualize this rather simple example,
    let's say that we are looking at a pool of 0.1*nrows products across different stores,
    that can have different values of the attribute "price"
    (in the list "stores").
    """
    
    price = [rd.randint(0,300) for i in range(nrows)]
    stores = [i%(0.1*nrows) for i in range(nrows)]

    data=zip(stores,price)
 
    return pd.DataFrame(data=data, columns=["stores", "price"])

            
def task(store):
    global data
    global alive_file
    """
    the task we want to accomplish: compute mean price
    for each store in the dataframe.
    """

    #print('I am alive!',flush=True) DO NOT put a print statement

    with open(alive_file, 'a') as f:
        f.write("I am alive !")

    product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))

    selected_store = product_df[product_df['stores'] == store] #select the mean for a given store

    return (store, selected_store['mean_price'])

def pinit(_df, _alive_file):
    global data 
    global alive_file
    data = _df
    alive_file = _alive_file

def get_ram_usage_pct(): #source: https://www.pragmaticlinux.com/2020/12/monitor-cpu-and-ram-usage-in-python-with-psutil/
    """
    Obtains the system's current RAM usage.
    :returns: System RAM usage as a percentage.
    :rtype: float
    """
    return psutil.virtual_memory().percent


if __name__ == "__main__":
    ##
    nrows= int(sys.argv[1]) #number of rows in dataframe
    nb_jobs= int(sys.argv[2]) #number of jobs


    print('Creating data...')
    tic=time.time()
    df = create_datafile(nrows)
    toc=time.time()
    print(f'Data created. Took {round((toc-tic)/60,1)} minutes (rounded to 0.1)')

    print('RAM usage after data creation is {} %'.format(get_ram_usage_pct()))

    #print(data_df)

    #create parameters for multiprocessing task
    stores_list = [(i % (0.1 * nrows),) for i in range(nrows)]
    #dics_stores=[{} for _ in stores_list]
    #parameters = [(df, stores_list[i]) for i in range(nrows)]

    dic_mean={}
    #launch multiprocessing tasks with starmaps
    tic=time.time()
    print(f'Max number of jobs: {mp.cpu_count() - 1}')
    print(f'Running: {min(nb_jobs, mp.cpu_count() - 1)} jobs...')
    with mp.Pool(initializer=pinit, initargs=(df,"alive.txt",), processes=min(nb_jobs, mp.cpu_count() - 1)) as pool:
        for store,result in pool.starmap_async(task, stores_list).get():
            dic_mean[store] = result[store]
    toc=time.time()
    print(f'Processed data in {round((toc-tic)/60,1)} minutes (rounded to 0.1).') 

    #print(dic_mean)
    #dic_means now contains all the means computed by each program.

感谢大家花时间判断我的问题并改进我的代码,帮助我识别真正的问题.

Python相关问答推荐

Python中的负前瞻性regex遇到麻烦

根据网格和相机参数渲染深度

列表上值总和最多为K(以O(log n))的最大元素数

如何使用Jinja语法在HTML中重定向期间传递变量?

通过优化空间在Python中的饼图中添加标签

我在使用fill_between()将最大和最小带应用到我的图表中时遇到问题

运行终端命令时出现问题:pip start anonymous"

如何记录脚本输出

ODE集成中如何终止solve_ivp的无限运行

如果条件不满足,我如何获得掩码的第一个索引并获得None?

Plotly Dash Creating Interactive Graph下拉列表

启动带有参数的Python NTFS会导致文件路径混乱

可以bcrypts AES—256 GCM加密损坏ZIP文件吗?

在matplotlib中使用不同大小的标记顶部添加批注

从旋转的DF查询非NaN值

在电影中向西北方向对齐""

使用SQLAlchemy从多线程Python应用程序在postgr中插入多行的最佳方法是什么?'

ModuleNotFoundError:Python中没有名为google的模块''

使用np.fft.fft2和cv2.dft重现相位谱.为什么结果并不相似呢?

Django抛出重复的键值违反唯一约束错误