假设我们有一个大小为M x N的数值数组A,我们将其解释为维度NM个向量.对于三个向量a,b,c,我们想要计算它们形成的Angular 的余弦:

cos(angle(a,b,c)) = np.dot((a-b)/norm(a-b), (c-b)/norm(c-b))

我们想要计算A的三元组的这个量,其中应该有(M choose 2)*(M-2)unique个三元组(根据ac的对称性;如果我说错了,请纠正我).当然,我们可以通过一个三层嵌套的for循环来实现这一点,但我希望这可以以一种矢量化的方式完成.我非常肯定我可以使用一些广播技巧来计算一个包含includes个所需输出的数组,但我希望有人能提供一个计算exactly唯一量的配方,最好不需要额外的计算.谢谢.

edit.代表完备性,朴素的实施.带循环的:

angles = []
for i in range(len(A)):
    for j in range(len(A)):
        for k in range(i+1, len(A)):
            if j not in (i,k):
                d1 = A[i] - A[j]
                d2 = A[k] - A[j]
                ang = np.dot(d1/np.linalg.norm(d1), d2/np.linalg.norm(d2))
                angles.append(ang)

推荐答案

其中应该有(M choose 2)*(M-2)个唯一的三元组(根据a和c的对称性;如果我在这一点上错了,请纠正我)

我想这是对的.我数到了M * ((M-1) choose 2),这相当于.

我希望有人能提供一个配方,准确地计算唯一的量,最好不需要额外的计算.

好,让我们从简单的事情开始-向量化您的循环,assuming我们已经预先生成了索引ijk的array.

def cosang1(A, i, j, k):
    d1 = A[i] - A[j]
    d2 = A[k] - A[j]
    d1_hat = np.linalg.norm(d1, axis=1, keepdims=True)
    d2_hat = np.linalg.norm(d2, axis=1, keepdims=True)
    # someone will almost certainly suggest a better way to do this
    ang = np.einsum("ij, ij -> i", d1/d1_hat, d2/d2_hat)
    return ang

这将问题简化为计算索引数组,假设索引数组只占总计算时间的足够小的一部分.如果不做这种事情,我想不出一种方法来避免重复的计算.

然后,如果我们愿意允许冗余计算,生成指数的最简单方法是np.meshgrid.

def cosang2(A):
    i = np.arange(len(A))
    i, j, k = np.meshgrid(i, i, i)
    i, j, k = i.ravel(), j.ravel(), k.ravel()
    return cosang1(A, i, j, k)

对于Colab上的A个Shape(30,3),Python循环方法花费了160ms,而这个解花费了7ms.

如果我们被允许使用Numba,快速生成独特的指数集是非常容易的.这基本上就是您的代码分解成一个函数:

from numba import jit
# generate unique tuples of indices of vectors
@jit(nopython=True)
def get_ijks(M):
    ijks = []
    for i in range(M):
        for j in range(M):
            for k in range(i+1, M):
                if j not in (i, k):
                    ijks.append((i, j, k))
    return ijks

(当然,我们也可以在您的整个循环中使用Numba.)

与冗余矢量化解决方案相比,这花费的时间不到一半.

可以使用纯NumPy有效地生成索引.起初,我认为事情会很简单,比如:

i = np.arange(M)
j, k = np.triu_indices(M, 1)
i, j, k = np.broadcast_arrays(i, j[:, None], k[:, None])
i, j, k = i.ravel(), j.ravel(), k.ravel()

这并不完全正确,但可以从这个开始,并用超过range(M)的循环修复这些索引(无论如何,比三重嵌套要好!).类似于:

# generates the same set of indices as `get_ijks`,
# but currently a bit slower.
def get_ijks2(M):
    i = np.arange(M)
    j, k = np.triu_indices(M-1, 1)
    i, j, k = np.broadcast_arrays(i[:, None], j, k)
    i, j, k = i.ravel(), j.ravel(), k.ravel()
    for ii in range(M):
        # this can be improved by using slices
        # instead of masks where possible
        mask0 = i == ii
        mask1 = (j >= ii) & mask0
        mask2 = (k >= ii) & mask0
        j[mask1] += 1
        k[mask1 | mask2] += 1
    return j, i, k  # intentionally swapped due to the way I think about this

~~我认为只使用切片和不使用口罩就可以加快速度,但今晚我不能这样做.

Update: as indicated in the comment, the last loop wasn't necessary!

def get_ijks3(M):
    i = np.arange(M)
    j, k = np.triu_indices(M-1, 1)
    i, j, k = np.broadcast_arrays(i[:, None], j, k)
    i, j, k = i.ravel(), j.ravel(), k.ravel()
    mask1 = (j >= i)
    mask2 = (k >= i)
    j[mask1] += 1
    k[mask1 | mask2] += 1
    return j, i, k  # intentionally swapped

这比Numba循环快得多.我真的很惊讶这竟然成功了!


所有代码放在一起,以防您想要运行它:

from numba import jit
import numpy as np
rng = np.random.default_rng(23942342)

M = 30
N = 3
A = rng.random((M, N))

# generate unique tuples of indices of vectors
@jit(nopython=True)
def get_ijks(M):
    ijks = []
    for i in range(M):
        for j in range(M):
            for k in range(i+1, M):
                if j not in (i, k):
                    ijks.append((i, j, k))
    return ijks

# attempt to generate the same integers efficiently
# without Numba
def get_ijks2(M):
    i = np.arange(M)
    j, k = np.triu_indices(M-1, 1)
    i, j, k = np.broadcast_arrays(i[:, None], j, k)
    i, j, k = i.ravel(), j.ravel(), k.ravel()
    for ii in range(M):
        # this probably doesn't need masks
        mask0 = i == ii
        mask1 = (j >= ii) & mask0
        mask2 = (k >= ii) & mask0
        j[mask1] += 1
        k[mask1 | mask2] += 1
    return j, i, k  # intentionally swapped due to the way I think about this

# proposed method 
def cosang1(A, i, j, k):
    d1 = A[i] - A[j]
    d2 = A[k] - A[j]
    d1_hat = np.linalg.norm(d1, axis=1, keepdims=True)
    d2_hat = np.linalg.norm(d2, axis=1, keepdims=True)
    ang = np.einsum("ij, ij -> i", d1/d1_hat, d2/d2_hat)
    return ang

# another naive implementation
def cosang2(A):
    i = np.arange(len(A))
    i, j, k = np.meshgrid(i, i, i)
    i, j, k = i.ravel(), j.ravel(), k.ravel()
    return cosang1(A, i, j, k)

# naive implementation provided by OP
def cosang0(A):
    angles = []
    for i in range(len(A)):
        for j in range(len(A)):
            for k in range(i+1, len(A)):
                if j not in (i,k):
                    d1 = A[i] - A[j]
                    d2 = A[k] - A[j]
                    ang = np.dot(d1/np.linalg.norm(d1), d2/np.linalg.norm(d2))
                    angles.append(ang)
    return angles

%timeit cosang0(A)

%timeit get_ijks(len(A))
ijks = np.asarray(get_ijks(M)).T
%timeit cosang1(A, *ijks)

%timeit cosang2(A)

# 180 ms ± 34.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
# 840 µs ± 68.3 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
# 2.19 ms ± 126 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
# <ipython-input-1-d2a3835710f2>:26: RuntimeWarning: invalid value encountered in divide
#   ang = np.einsum("ij, ij -> i", d1/d1_hat, d2/d2_hat)
# 8.13 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

cosangs0 = cosang0(A)
cosangs1 = cosang1(A, *ijks)
cosangs2 = cosang2(A)
np.testing.assert_allclose(cosangs1, cosangs0)  # passes

%timeit get_ijks2(M)
# 1.73 ms ± 242 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
i, j, k = get_ijks2(M)
cosangs3 = cosang1(A, i, j, k)
np.testing.assert_allclose(np.sort(cosangs3), np.sort(cosangs0))  # passes

%timeit get_ijks3(M)
# 184 µs ± 25.8 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
i, j, k = get_ijks3(M)
cosangs4 = cosang1(A, i, j, k)
np.testing.assert_allclose(np.sort(cosangs4), np.sort(cosangs0))  # passes

Python相关问答推荐

海运图:调整行和列标签

从dict的列中分钟

计算组中唯一值的数量

Python,Fitting into a System of Equations

Python中绕y轴曲线的旋转

如何使用表达式将字符串解压缩到Polars DataFrame中的多个列中?

有没有一种方法可以从python的pussompy比较结果中提取文本?

在Python中动态计算范围

将9个3x3矩阵按特定顺序排列成9x9矩阵

用渐近模计算含符号的矩阵乘法

将scipy. sparse矩阵直接保存为常规txt文件

基于多个数组的多个条件将值添加到numpy数组

Cython无法识别Numpy类型

如何在一组行中找到循环?

如果不使用. to_list()[0],我如何从一个pandas DataFrame中获取一个值?

删除Dataframe中的第一个空白行并重新索引列

在一个数据帧中,我如何才能发现每个行号是否出现在一列列表中?

Numpy`astype(Int)`给出`np.int64`而不是`int`-怎么办?

如何在Django查询集中生成带有值列表的带注释的字段?

如何在微调Whisper模型时更改数据集?