在python3中使用多处理池时,我遇到了一些困难.我想使用pool进行大数组计算. map 基本上,我有一个3D数组,我需要做10次计算,它会按顺序生成10个输出文件.这个任务可以完成3次,即在输出中,我们得到3*10=30个输出文件(*.txt).为此,我为小数组计算准备了以下脚本(一个示例问题).然而,当我使用此脚本进行大型数组计算或从一系列文件中生成数组时,这段代码(可能是池)捕获内存,并且不会保存任何内存.目标目录中的txt文件.使用命令mpirun python3 sample_prob_func.py运行文件时没有错误消息

import numpy as np
import multiprocessing as mp
from scipy import signal
import matplotlib.pyplot as plt
import contextlib
import os, glob, re
import random
import cmath, math
import time
import pdb

#File Storing path
save_results_to = 'File saving path'

arr_x = [0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0, 8.49, 12.0]
arr_y = [0, 8.49, 12.0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0]
N=len(arr_x)

np.random.seed(12345)
total_rows = 5000
arr = np.reshape(np.random.rand(total_rows*N),(total_rows, N))
arr1 = np.reshape(np.random.rand(total_rows*N),(total_rows, N))
arr2 = np.reshape(np.random.rand(total_rows*N),(total_rows, N))

# Finding cross spectral density (CSD)
def my_func1(data):
    # Do something here
    return  array1


t0 = time.time()
my_data1 = my_func1(arr)
my_data2 = my_func1(arr1)
my_data3 = my_func1(arr2)

print('Time required {} seconds to execute CSD--For loop'.format(time.time()-t0))
mydata_list  = [my_data1,my_data3,my_data3]


def my_func2(data2):
    # Do something here
    return from_data2



start_freq = 100
stop_freq  = 110
freq_range= np.around(np.linspace(start_freq,stop_freq,11)/10, decimals=2)
no_of_freq = len(freq_range)

list_arr =[]

def my_func3(csd):
    list_csd=[]
    for fr_count in range(start_freq, stop_freq):
        csd_single = csd[:,:, fr_count]
        list_csd.append(csd_single)
    print('Shape of list is :', np.array(list_csd).shape)
    return list_csd

def parallel_function(BIG_list_data):
    with contextlib.closing(mp.Pool(processes=10)) as pool:
       dft= pool.map(my_func2, BIG_list_data)
       pool.close()
       pool.join()
    data_arr = np.array(dft)
    print('shape of data :', data_arr.shape)
    return data_arr

count_day = 1
count_hour =0
for count in range(3):
    count_hour +=1
    list_arr = my_func3(mydata_list[count])  # Load Numpy files
    print('Array shape is :', np.array(arr).shape)
    t0 = time.time()
    data_dft = parallel_function(list_arr)
    print('The hour number={} data is processing... '.format(count_hour))
    print('Time in parallel:', time.time() - t0)
    for i in range(no_of_freq-1): # (11-1=10)
        jj = freq_range[i]
        #print('The hour_number {} and frequency number {} data is processing... '.format(count_hour, jj))
        dft_1hr_complx = data_dft[i,:,:]
        np.savetxt(save_results_to + f'csd_Day_{count_day}_Hour_{count_hour}_f_{jj}_hz.txt',  dft_1hr_complx.view(float))

推荐答案

正如@JérômeRichard所建议的,要了解作业(job)调度器,您需要定义执行此任务的处理器数量.因此,以下命令可以帮助您:ncpus = int(os.getenv('SLURM_CPUS_PER_TASK', 1))

您需要在python脚本中使用这一行.此外,在parallel_function内部使用with contextlib.closing(mp.Pool(ncpus=10)) as pool:而不是with contextlib.closing(mp.Pool(processes=10)) as pool:.谢谢

Python-3.x相关问答推荐

"安装serial vs安装psyserial header,"""

为什么打印语句在Python多处理脚本中执行两次?

正则表达式匹配并提取括号前的单词

正确的本地react 方式-Django身份验证

我不能使用拆分来分隔数据

在 Python 中比较和排序列之间的值(带有不匹配列)

为什么空列表也能起作用?

基于组/ID从原始数据框中创建两个子数据框

如何转置和 Pandas DataFrame 并命名新列?

Keras 中 Conv2D 层的意外结果

Dask 多阶段资源设置导致 Failed to Serialize 错误

根据另一列值对多个数据框列进行分组

FastAPI - 调用 API 时设置 response_model_exclude

使用一周的特定第一天将每日日期转换为每周

在带有 M1 芯片(基于 ARM 的 Apple Silicon)的 Mac 上安装较早版本的 Python(3.8 之前)失败

通过多个键对字典列表进行分组和聚合

Pruning in Keras

无法解码 Python Web 请求

Python 无法处理以 0 开头的数字字符串.为什么?

TypeError:无法将系列转换为