我试图在同一Pandas DataFrame上并行执行同一任务但使用不同的调用参数的代码上使用Python的多处理库(pool_starmap
)来获得执行时间.
当我在数据帧的一小部分上执行此代码并执行10个作业(job)时,一切都工作得很好.然而,当我将整个100 000 000行数据集放入63个作业(job)(为此使用具有64个中央处理器核心的集群计算机)时,代码只是.冻结.它正在运行,但没有做任何事情(我知道它,因为每10 000个任务,代码应该打印一次它是活动的).
我在互联网上搜索并发现了类似的问题,但没有一个答案适用于我的具体 case ,所以我在这里.
Minimal Example
我做了一个最小的、self 维持的例子来重现这个问题. 假设为了简化,我的数据帧有2 columns;第一个是"stores",另一个是"price".我想恢复每家store 的平均价格. 当然,在这个特定问题中,我们只需将stores的rame分组并在price上进行聚合,但这是一种简化;让我们假设该任务只能在one store at a time中完成(这是我的 case ).以下是一个最小的例子:
#minimal example
#changes according to SIGHUP and Frank Yellin
import time
import pandas as pd
import random as rd
import multiprocessing as mp
import psutil #RAM usage
def create_datafile(nrows):
"""
create a random pandas dataframe file
To visualize this rather simple example,
let's say that we are looking at a pool of 0.1*nrows products across different stores,
that can have different values of the attribute "price"
(in the list "stores").
"""
price = [rd.randint(0,300) for i in range(nrows)]
stores = [i%(0.1*nrows) for i in range(nrows)]
data=zip(stores,price)
return pd.DataFrame(data=data, columns=["stores", "price"])
def task(store):
global data
"""
the task we want to accomplish: compute mean price
for each store in the dataframe.
"""
if rd.randint(1,10000)==1:
print('I am alive!')
product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))
selected_store = product_df[product_df['stores'] == store] #select the mean for a given store
return (store, selected_store['mean_price'])
def pinit(_df):
global data
data = _df
def get_ram_usage_pct():
#source: https://www.pragmaticlinux.com/2020/12/monitor-cpu-and-ram-usage-in-python-with-psutil/
"""
Obtains the system's current RAM usage.
:returns: System RAM usage as a percentage.
:rtype: float
"""
return psutil.virtual_memory().percent
if __name__ == "__main__":
##
nrows=100000000
nb_jobs= 63
print('Creating data...')
df = create_datafile(nrows)
print('Data created.')
print('RAM usage after data creation is {} %'.format(get_ram_usage_pct()))
stores_list = [i%(0.1*nrows) for i in range(nrows)]
dic_mean={}
#launch multiprocessing tasks with starmap
tic=time.time()
print(f'Max number of jobs: {mp.cpu_count() - 1}')
print(f'Running: {min(nb_jobs, mp.cpu_count() - 1)} jobs...')
with mp.Pool(initializer=pinit, initargs=(df,), processes=min(nb_jobs, mp.cpu_count() - 1)) as pool:
for store,result in pool.map_async(task, stores_list).get():
dic_mean[store] = result[store]
toc=time.time()
print(f'Processed data in {round((toc-tic)/60,1)} minutes (rounded to 0.1).')
#print(dic_mean)
#dic_means now contains all the means computed by each program.
我使用的是Python 3.9.2.
如果您使用以下方式启动此代码:
-
nrows = 10000
和nb_jobs = 10
,你应该不会遇到任何问题. - 然而,对于
nrows=100000000
和nb_jobs=63
,我提到的问题应该发生.
我对Python多处理很陌生,所以欢迎任何提示!提前感谢.