问题

我愿意使用多处理模块(multiprocessing.Pool.starmap()进行功能工程.

anaconda版本是4.3.30,Python版本是3.6(64位).

Code:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

Error Message:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

额外信息

  • HistoryCutoff是一个整数列表
  • train_scala是一个数据帧(377MB)
  • 测试是一个数据帧(15MB)
  • ts是一个数据帧(547MB)
  • ul_parts_path是目录列表(字符串)
  • is_train_seq是一个布尔人列表

Extra Code: Method multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

推荐答案

进程之间的通信协议使用pickling,并且pickle数据以pickle数据的大小作为前缀.对于您的方法,all arguments together被作为一个对象进行酸洗.

您生成了一个对象,当pickle大于i struct格式化程序(一个四字节有符号整数)时,它打破了代码所做的假设.

您可以将数据帧的读取委托给子进程,只发送加载数据帧所需的元数据.它们的总大小接近1GB,太多的数据无法在进程之间通过管道共享.

引用Programming guidelines section人的话:

Better to inherit than pickle/unpickle

当使用spawnforkserver启动方法时,multiprocessing中的许多类型需要可 Select ,以便子进程可以使用它们.However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

如果您不是在Windows上运行,并且使用spawnforkserver方法,则可以将数据帧作为全局before加载,以启动子进程,此时子进程将通过正常的操作系统写入内存页共享机制"继承"数据.

请注意,在Python 3.8中,对于非Windows系统,这个限制被提高到了无符号长(8字节),因此现在可以发送和接收4EiB个数据.参见this commit,以及Python第#35152#17560期.

如果无法升级,无法利用资源继承,并且没有在Windows上运行,请使用以下修补程序:

import functools
import logging
import struct
import sys

logger = logging.getLogger()


def patch_mp_connection_bpo_17560():
    """Apply PR-10305 / bpo-17560 connection send/receive max size update

    See the original issue at https://bugs.python.org/issue17560 and 
    https://github.com/python/cpython/pull/10305 for the pull request.

    This only supports Python versions 3.3 - 3.7, this function
    does nothing for Python versions outside of that range.

    """
    patchname = "Multiprocessing connection patch for bpo-17560"
    if not (3, 3) < sys.version_info < (3, 8):
        logger.info(
            patchname + " not applied, not an applicable Python version: %s",
            sys.version
        )
        return

    from multiprocessing.connection import Connection

    orig_send_bytes = Connection._send_bytes
    orig_recv_bytes = Connection._recv_bytes
    if (
        orig_send_bytes.__code__.co_filename == __file__
        and orig_recv_bytes.__code__.co_filename == __file__
    ):
        logger.info(patchname + " already applied, skipping")
        return

    @functools.wraps(orig_send_bytes)
    def send_bytes(self, buf):
        n = len(buf)
        if n > 0x7fffffff:
            pre_header = struct.pack("!i", -1)
            header = struct.pack("!Q", n)
            self._send(pre_header)
            self._send(header)
            self._send(buf)
        else:
            orig_send_bytes(self, buf)

    @functools.wraps(orig_recv_bytes)
    def recv_bytes(self, maxsize=None):
        buf = self._recv(4)
        size, = struct.unpack("!i", buf.getvalue())
        if size == -1:
            buf = self._recv(8)
            size, = struct.unpack("!Q", buf.getvalue())
        if maxsize is not None and size > maxsize:
            return None
        return self._recv(size)

    Connection._send_bytes = send_bytes
    Connection._recv_bytes = recv_bytes

    logger.info(patchname + " applied")

Python-3.x相关问答推荐

如何使用PySide6创建切换框架?

While循环不停止地等待,直到时间.睡眠结束

如何将python点击参数设置为与选项回调不同的参数的别名?

可以在 Python 的上下文管理器中调用 sys.exit() 吗?

与 pandas 0.22 相比,pandas 2.0.3 中的 df.replace() 会抛出 ValueError 错误

删除列表中的第二个出现

如何将列表和字典逐行组合在一起

如何使用 django rest 框架在 self forienkey 中删除多达 n 种类型的数据?

numpy是如何添加@运算符的?

如果网站加载时间过长,如何强制 Selenium 刷新

如何知道Pandas 列中的每个后续值是否都大于前面的值? Python相关

如何使用 regex sub 根据列表中的变量替换字符

将变量传递给 Google Cloud 函数

使用 python 正则表达式匹配日期

Python中的依赖倒置

BeautifulSoup 的 Python 3 兼容性

在python中打印下标

为现有项目创建virtualenv

将列表列表转换为Python中的字典字典

Python,Docker - ascii编解码器无法编码字符