我正在try 优化python程序的性能,我认为我已经将这段代码确定为瓶颈:

  for i in range(len(green_list)):
    rgb_list = []

    for j in range(len(green_list[i])):
      rgb_list.append('%02x%02x%02x' % (red_list[i][j], green_list[i][j], blue_list[i][j]))

    write_file(str(i), rgb_list)

其中,red_listgreen_listblue_list是具有如下值的numpy数组:

红色列表=[[1、2、3、4、5]、[51、52、53、54、55]]

绿色列表=[[6、7、8、9、10]、[56、57、58、59、60]]

blue\u列表=[[11、12、13、14、15]、[61、62、63、64、65]]

在每次执行结束时,用于rgb\U的内部列表包含十六进制值:

rgb\U列表=['01060b','02070c','03080d','04090e','050a01']

现在,我不清楚如何利用numpy数组的潜力,但我认为有一种方法可以优化这两个嵌套循环.有什么建议吗?

推荐答案

我假设您的代码的基本特征可以总结在以下生成器中:

import numpy as np


def as_str_OP(r_arr, g_arr, b_arr):
    n, m = r_arr.shape
    rgbs = []
    for i in range(n):
        rgb = []
        for j in range(m):
            rgb.append('%02x%02x%02x' % (r_arr[i, j], g_arr[i, j], b_arr[i, j]))
        yield rgb

可在for循环中使用,例如写入磁盘:

for x in as_str_OP(r_arr, g_arr, b_arr):
    write_to_disk(x)

生成器本身可以用Python矢量化的核心计算编写,也可以用Numba友好的方式编写.

这会导致大幅加速,尤其是随着输入大小的增长(尤其是第二维度).

以下是NumPy矢量化版本:

def as_str_np(r_arr, g_arr, b_arr):
    l = 3
    n, m = r_arr.shape
    rgbs = []
    for i in range(n):
        rgb = np.empty((m, 2 * l), dtype=np.uint32)
        r0, r1 = divmod(r_arr[i, :], 16)
        g0, g1 = divmod(g_arr[i, :], 16)
        b0, b1 = divmod(b_arr[i, :], 16)
        rgb[:, 0] = hex_to_ascii(r0)
        rgb[:, 1] = hex_to_ascii(r1)
        rgb[:, 2] = hex_to_ascii(g0)
        rgb[:, 3] = hex_to_ascii(g1)
        rgb[:, 4] = hex_to_ascii(b0)
        rgb[:, 5] = hex_to_ascii(b1)
        yield rgb.view(f'<U{2 * l}').reshape(m).tolist()

以及Nuba加速版:

import numba as nb


@nb.njit
def hex_to_ascii(x):
    ascii_num_offset = 48  # ord(b'0') == 48
    ascii_alp_offset = 87  # ord(b'a') == 97, (num of non-alpha digits) == 10
    return x + (ascii_num_offset if x < 10 else ascii_alp_offset)


@nb.njit
def _to_hex_2d(x):
    a, b = divmod(x, 16)
    return hex_to_ascii(a), hex_to_ascii(b)


@nb.njit
def _as_str_nb(r_arr, g_arr, b_arr):
    l = 3
    n, m = r_arr.shape
    for i in range(n):
        rgb = np.empty((m, 2 * l), dtype=np.uint32)
        for j in range(m):
            rgb[j, 0:2] = _to_hex_2d(r_arr[i, j])
            rgb[j, 2:4] = _to_hex_2d(g_arr[i, j])
            rgb[j, 4:6] = _to_hex_2d(b_arr[i, j])
        yield rgb


def as_str_nb(r_arr, g_arr, b_arr):
    l = 3
    n, m = r_arr.shape
    for x in _as_str_nb(r_arr, g_arr, b_arr):
        yield x.view(f'<U{2 * l}').reshape(m).tolist()

这基本上需要手动将正确转换为十六进制ASCII字符的数字写入正确类型的数组,然后将其转换为所需的输出.

请注意,如果生成器能够处理NumPy数组本身,则可以避免使用最终numpy.ndarray.tolist(),从而节省一些潜在的大量时间,例如:

def as_str_nba(r_arr, g_arr, b_arr):
    l = 3
    n, m = r_arr.shape
    for x in _as_str_nb(r_arr, g_arr, b_arr):
        yield x.view(f'<U{2 * l}').reshape(m)

Overcoming IO-bound bottleneck

但是,如果您是IO受限的,则应修改代码以分块写入,例如使用itertools recipes中的grouper配方:

from itertools import zip_longest


def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == 'fill':
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == 'strict':
        return zip(*args, strict=True)
    if incomplete == 'ignore':
        return zip(*args)
    else:
        raise ValueError('Expected fill, strict, or ignore')

使用方式如下:

group_size = 3
for x in grouper(as_str_OP(r_arr, g_arr, b_arr), group_size):
    write_many_to_disk(x)

Testing out the output

可以很容易地生成一些虚拟输入(r_arr基本上是red_list,等等):

def gen_color(n, m):
    return np.random.randint(0, 2 ** 8, (n, m))


N, M = 10, 3
r_arr = gen_color(N, M)
g_arr = gen_color(N, M)
b_arr = gen_color(N, M)

并通过消耗发电机产生list:

res_OP = list(as_str_OP(r_arr, g_arr, b_arr))
res_np = list(as_str_np(r_arr, g_arr, b_arr))
res_nb = list(as_str_nb(r_arr, g_arr, b_arr))
res_nba = list(as_str_nba(r_arr, g_arr, b_arr))
print(np.array(res_OP))
# [['1f6984' '916d98' 'f9d779']
#  ['65f895' 'ded23e' '332fdc']
#  ['b9e059' 'ce8676' 'cb75e9']
#  ['bca0fc' '3289a9' 'cc3d3a']
#  ['6bb0be' '07134a' 'c3cf05']
#  ['152d5c' 'bac081' 'c59a08']
#  ['97efcc' '4c31c0' '957693']
#  ['15247e' 'af8f0a' 'ffb89a']
#  ['161333' '8f41ce' '187b01']
#  ['d811ae' '730b17' 'd2e269']]
print(res_OP == res_np)
# True
print(res_OP == res_nb)
# True
print(res_OP == [x.tolist() for x in res_nba])
# True

最终通过一些分组:

k = 3
res_OP = list(grouper(as_str_OP(r_arr, g_arr, b_arr), k))
res_np = list(grouper(as_str_np(r_arr, g_arr, b_arr), k))
res_nb = list(grouper(as_str_nb(r_arr, g_arr, b_arr), k))
res_nba = list(grouper(as_str_nba(r_arr, g_arr, b_arr), k))
print(np.array(res_OP))
# [[list(['1f6984', '916d98', 'f9d779'])
#   list(['65f895', 'ded23e', '332fdc'])
#   list(['b9e059', 'ce8676', 'cb75e9'])]
#  [list(['bca0fc', '3289a9', 'cc3d3a'])
#   list(['6bb0be', '07134a', 'c3cf05'])
#   list(['152d5c', 'bac081', 'c59a08'])]
#  [list(['97efcc', '4c31c0', '957693'])
#   list(['15247e', 'af8f0a', 'ffb89a'])
#   list(['161333', '8f41ce', '187b01'])]
#  [list(['d811ae', '730b17', 'd2e269']) None None]]
print(res_OP == res_np)
# True
print(res_OP == res_nb)
# True
print(res_OP == [tuple(y.tolist() if y is not None else y for y in x) for x in res_nba])
# True

Benchmarks

为了让您了解我们可能讨论的数字,让我们在更大的输入上使用%timeit:

N, M = 1000, 1000
r_arr = gen_color(N, M)
g_arr = gen_color(N, M)
b_arr = gen_color(N, M)


%timeit -n 1 -r 1 list(as_str_OP(r_arr, g_arr, b_arr))
# 1 loop, best of 1: 1.1 s per loop
%timeit -n 4 -r 4 list(as_str_np(r_arr, g_arr, b_arr))
# 4 loops, best of 4: 279 ms per loop
%timeit -n 4 -r 4 list(as_str_nb(r_arr, g_arr, b_arr))
# 1 loop, best of 1: 96.5 ms per loop
%timeit -n 4 -r 4 list(as_str_nba(r_arr, g_arr, b_arr))
# 4 loops, best of 4: 10.4 ms per loop

要模拟磁盘写入,我们可以使用以下使用者:

import time
import math


def consumer(gen, timeout_sec=0.001, weight=1):
    result = []
    for x in gen:
        result.append(x)
        time.sleep(timeout_sec * weight)
    return result

其中,使用time.sleep()调用模拟磁盘写入,超时取决于对象大小的对数:

N, M = 1000, 1000
r_arr = gen_color(N, M)
g_arr = gen_color(N, M)
b_arr = gen_color(N, M)


%timeit -n 1 -r 1 consumer(as_str_OP(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 2.37 s per loop
%timeit -n 1 -r 1 consumer(as_str_np(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 1.48 s per loop
%timeit -n 1 -r 1 consumer(as_str_nb(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 1.27 s per loop
%timeit -n 1 -r 1 consumer(as_str_nba(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 1.13 s per loop

k = 100
%timeit -n 1 -r 1 consumer(grouper(as_str_OP(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 1.17 s per loop
%timeit -n 1 -r 1 consumer(grouper(as_str_np(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 368 ms per loop
%timeit -n 1 -r 1 consumer(grouper(as_str_nb(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 173 ms per loop
%timeit -n 1 -r 1 consumer(grouper(as_str_nba(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 87.4 ms per loop

忽略磁盘写入模拟,NumPy矢量化方法在测试输入大小方面快约4倍,而Numba加速方法则快约10倍至numpy.ndarray.tolist()倍,这取决于是否存在到list() with numpy.ndarray.tolist()的潜在无用转换.

simulated磁盘写入方面,速度更快的版本或多或少都是等效的,并且在没有分组的情况下效率明显降低,从而导致大约2倍的速度提升.

同样,这是在给定输入和测试条件下进行的.

Python相关问答推荐

将图像拖到另一个图像

梯度下降:简化要素集的运行时间比原始要素集长

mypy无法推断类型参数.List和Iterable的区别

如何在PySide/Qt QColumbnView中删除列

LocaleError:模块keras._' tf_keras. keras没有属性__internal_'''

寻找Regex模式返回与我当前函数类似的结果

GPT python SDK引入了大量开销/错误超时

如何反转一个框架中列的值?

浏览超过10k页获取数据,解析:欧洲搜索服务:从欧盟站点收集机会的微小刮刀&

如何在Python中从html页面中提取html链接?

我如何处理超类和子类的情况

如何在Polars中创建条件增量列?

某些值的数值幂和**之间的差异

我如何为测试函数的参数化提供fixture 生成的数据?如果我可以的话,还有其他 Select 吗?

Python:在cmd中添加参数时的语法

了解如何让库认识到我具有所需的依赖项

如何通过函数的强式路径动态导入函数?

日志(log)轴上的自定义刻度出现意外的次要刻度标记行为

在伪子进程中模拟标准输出.打开

捕获脚本和退出代码的多行输出