我试图在numpy数组中找到重复的行.以下代码复制了我的数组的 struct ,该数组每行有n行、m列和nz个非零条目:

import numpy as np
import random
import datetime


def create_mat(n, m, nz):
    sample_mat = np.zeros((n, m), dtype='uint8')
    random.seed(42)
    for row in range(0, n):
        counter = 0
        while counter < nz:
            random_col = random.randrange(0, m-1, 1)
            if sample_mat[row, random_col] == 0:
                sample_mat[row, random_col] = 1
                counter += 1
    test = np.all(np.sum(sample_mat, axis=1) == nz)
    print(f'All rows have {nz} elements: {test}')
    return sample_mat

我试图优化的代码如下:

if __name__ == '__main__':
    threshold = 2
    mat = create_mat(1800000, 108, 8)

    print(f'Time: {datetime.datetime.now()}')
    duplicate_rows, _, duplicate_counts = np.unique(mat, axis=0, return_counts=True, return_index=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Duplicate rows: {len(duplicate_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(duplicate_rows[0:5])

我的输出如下:

All rows have 8 elements: True
Time: 2022-06-29 12:08:07.320834
Time: 2022-06-29 12:08:23.281633
Duplicate rows: 1799994 Sample inds: [508991, 553136, 930379, 1128637, 1290356] Sample counts: [1 1 1 1 1]
Sample rows:
[[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 1 0 0 1 1 0 0 0 1 0 0 0 0 0 0 1 0 1 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 1 0 0 0 0 0 1 1 1 1 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 1 0 1 0 0 1 0 0 0 1 0 1 0 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 0 0 0 1 1 0 0 0 0 0 0 1 1 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 0 0 1 0 1 1 0 0 0 0 0 1 0 0 0 0 1 0]]

我考虑过使用NUBA,但挑战是它不使用轴参数.类似地,转换为列表和利用集合也是一种 Select ,但随后通过循环执行重复计数似乎"不符合逻辑".

考虑到我需要多次运行此代码(因为我正在修改numpy数组,然后需要重新搜索重复项),时间至关重要.我也try 对这一步使用多处理,但np.unique似乎被阻塞了(即,即使我try 运行多个版本的unique,我最终也会限制一个线程以6%的CPU容量运行,而其他线程则处于空闲状态).

推荐答案

步骤1:钻头包装

由于矩阵只包含二进制值,因此可以使用aggressively pack the bits into uint64 values来执行更高效的排序.以下是NUBA实现:

import numpy as np
import numba as nb

@nb.njit('(uint8[:,::1],)', parallel=True)
def pack_bits(mat):
    n, m = mat.shape
    res = np.zeros((n, (m+63)//64), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(0)
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            res[i, bj//64] = val
    return res

@nb.njit('(uint64[:,::1], int_)', parallel=True)
def unpack_bits(mat, m):
    n = mat.shape[0]
    assert mat.shape[1] == (m+63)//64
    res = np.zeros((n, m), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(mat[i, bj//64])
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
    return res

np.unique函数可以在更小的压缩数组上调用,就像在初始代码中一样(除了得到的排序数组是压缩数组,需要解包).因为您不需要索引,所以最好不要计算它.因此,可以移除return_index=True.此外,只能解包所需的值(解包比打包要贵一些,因为编写一个大矩阵比读取一个现有矩阵要贵).

if __name__ == '__main__':
    threshold = 2
    n, m = 1800000, 108
    mat = create_mat(n, m, 8)

    print(f'Time: {datetime.datetime.now()}')
    packed_mat = pack_bits(mat)
    duplicate_packed_rows, duplicate_counts = np.unique(packed_mat, axis=0, return_counts=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Duplicate rows: {len(duplicate_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unpack_bits(duplicate_packed_rows[0:5], m))

Step 2: np.unique optimizations

np.unique调用是次优的,因为它执行多个昂贵的内部排序步骤.在你的specific case中并不是所有这些都需要,有些步骤可以优化.

更有效的实现包括在第一步对最后一列进行排序,然后对前一列进行排序,依此类推,直到第一列的排序与基数排序类似.注意,最后一列可以使用非稳定算法进行排序(通常更快),但其他列需要一个稳定的算法.这种方法仍然是次优的,因为argsort个调用都很慢,而且当前的实现还没有使用多线程.不幸的是,Numpy还没有证明任何对2D数组的行进行排序的有效方法.虽然可以在Nuba中重新实现这一点,但这很麻烦,有点棘手,而且容易出现错误.更不用说,与原生C/C++代码相比,Numba引入了一些开销.排序后,可以跟踪和统计唯一/重复行.以下是一个实现:

def sort_lines(mat):
    n, m = mat.shape

    for i in range(m):
        kind = 'stable' if i > 0 else None
        mat = mat[np.argsort(mat[:,m-1-i], kind=kind)]

    return mat

@nb.njit('(uint64[:,::1],)', parallel=True)
def find_duplicates(sorted_mat):
    n, m = sorted_mat.shape
    assert m >= 0

    isUnique = np.zeros(n, np.bool_)
    uniqueCount = 1
    if n > 0:
        isUnique[0] = True
    for i in nb.prange(1, n):
        isUniqueVal = False
        for j in range(m):
            isUniqueVal |= sorted_mat[i, j] != sorted_mat[i-1, j]
        isUnique[i] = isUniqueVal
        uniqueCount += isUniqueVal

    uniqueValues = np.empty((uniqueCount, m), np.uint64)
    duplicateCounts = np.zeros(len(uniqueValues), np.uint64)

    cursor = 0
    for i in range(n):
        cursor += isUnique[i]
        for j in range(m):
            uniqueValues[cursor-1, j] = sorted_mat[i, j]
        duplicateCounts[cursor-1] += 1

    return uniqueValues, duplicateCounts

之前的np.unique次呼叫可以替换为find_duplicates(sort_lines(packed_mat))次.

Step 3: GPU-based np.unique

虽然使用Nuba和Numpy在CPU上实现快速算法对行进行排序并不容易,但只要在GPU上使用CuPy即可,前提是Nvidia GPU可用并且安装了CUDA(以及CuPy).该解决方案的优点是简单且效率显著提高.下面是一个示例:

import cupy as cp

def cupy_sort_lines(mat):
    cupy_mat = cp.array(mat)
    return cupy_mat[cp.lexsort(cupy_mat.T[::-1,:])].get()

之前的sort_lines次呼叫可以替换为cupy_sort_lines次.


后果

以下是我使用6核i5-9600KF CPU和Nvidia 1660 Super GPU的机器上的计时:

Initial version:        15.541 s
Optimized packing:       0.982 s
Optimized np.unique:     0.634 s
GPU-based sorting:       0.143 s   (require a Nvidia GPU)

因此,基于CPU的优化版本约为25 times faster,基于GPU的优化版本为109 times faster.请注意,在所有版本中,排序都需要花费大量时间.此外,请注意,解包不包括在基准测试中(如所提供的代码中所示).只要只有几行被解包,而不是整个数组(在我的机器上大约需要200毫秒),那么这需要的时间可以忽略不计.最后一个操作可以进一步优化,但代价是实现更加复杂.

Python相关问答推荐

Pandas 都是(),但有一个门槛

avxspan与pandas period_range

我对我应该做什么以及我如何做感到困惑'

迭代嵌套字典的值

如何在图中标记平均点?

try 检索blob名称列表时出现错误填充错误""

在极中解析带有数字和SI前缀的字符串

跳过嵌套JSON中的级别并转换为Pandas Rame

干燥化与列姆化的比较

为什么在FastAPI中创建与数据库的连接时需要使用生成器?

Odoo16:模板中使用的docs变量在哪里定义?

将一个双框爆炸到另一个双框的范围内

多个矩阵的张量积

来自Airflow Connection的额外参数

将字节序列解码为Unicode字符串

TypeError:';Locator';对象无法在PlayWriter中使用.first()调用

我怎样才能让深度测试在OpenGL中使用Python和PyGame呢?

Pandas:使列中的列表大小与另一列中的列表大小相同

对当前的鼹鼠进行编码,并且我的按键获得了注册

在不降低分辨率的情况下绘制一组数据点的最外轮廓