我假设您的代码的基本特征可以总结在以下生成器中:
import numpy as np
def as_str_OP(r_arr, g_arr, b_arr):
n, m = r_arr.shape
rgbs = []
for i in range(n):
rgb = []
for j in range(m):
rgb.append('%02x%02x%02x' % (r_arr[i, j], g_arr[i, j], b_arr[i, j]))
yield rgb
可在for
循环中使用,例如写入磁盘:
for x in as_str_OP(r_arr, g_arr, b_arr):
write_to_disk(x)
生成器本身可以用Python矢量化的核心计算编写,也可以用Numba友好的方式编写.
这会导致大幅加速,尤其是随着输入大小的增长(尤其是第二维度).
以下是NumPy矢量化版本:
def as_str_np(r_arr, g_arr, b_arr):
l = 3
n, m = r_arr.shape
rgbs = []
for i in range(n):
rgb = np.empty((m, 2 * l), dtype=np.uint32)
r0, r1 = divmod(r_arr[i, :], 16)
g0, g1 = divmod(g_arr[i, :], 16)
b0, b1 = divmod(b_arr[i, :], 16)
rgb[:, 0] = hex_to_ascii(r0)
rgb[:, 1] = hex_to_ascii(r1)
rgb[:, 2] = hex_to_ascii(g0)
rgb[:, 3] = hex_to_ascii(g1)
rgb[:, 4] = hex_to_ascii(b0)
rgb[:, 5] = hex_to_ascii(b1)
yield rgb.view(f'<U{2 * l}').reshape(m).tolist()
以及Nuba加速版:
import numba as nb
@nb.njit
def hex_to_ascii(x):
ascii_num_offset = 48 # ord(b'0') == 48
ascii_alp_offset = 87 # ord(b'a') == 97, (num of non-alpha digits) == 10
return x + (ascii_num_offset if x < 10 else ascii_alp_offset)
@nb.njit
def _to_hex_2d(x):
a, b = divmod(x, 16)
return hex_to_ascii(a), hex_to_ascii(b)
@nb.njit
def _as_str_nb(r_arr, g_arr, b_arr):
l = 3
n, m = r_arr.shape
for i in range(n):
rgb = np.empty((m, 2 * l), dtype=np.uint32)
for j in range(m):
rgb[j, 0:2] = _to_hex_2d(r_arr[i, j])
rgb[j, 2:4] = _to_hex_2d(g_arr[i, j])
rgb[j, 4:6] = _to_hex_2d(b_arr[i, j])
yield rgb
def as_str_nb(r_arr, g_arr, b_arr):
l = 3
n, m = r_arr.shape
for x in _as_str_nb(r_arr, g_arr, b_arr):
yield x.view(f'<U{2 * l}').reshape(m).tolist()
这基本上需要手动将正确转换为十六进制ASCII字符的数字写入正确类型的数组,然后将其转换为所需的输出.
请注意,如果生成器能够处理NumPy数组本身,则可以避免使用最终numpy.ndarray.tolist()
,从而节省一些潜在的大量时间,例如:
def as_str_nba(r_arr, g_arr, b_arr):
l = 3
n, m = r_arr.shape
for x in _as_str_nb(r_arr, g_arr, b_arr):
yield x.view(f'<U{2 * l}').reshape(m)
Overcoming IO-bound bottleneck
但是,如果您是IO受限的,则应修改代码以分块写入,例如使用itertools
recipes中的grouper
配方:
from itertools import zip_longest
def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
"Collect data into non-overlapping fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
# grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
# grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
args = [iter(iterable)] * n
if incomplete == 'fill':
return zip_longest(*args, fillvalue=fillvalue)
if incomplete == 'strict':
return zip(*args, strict=True)
if incomplete == 'ignore':
return zip(*args)
else:
raise ValueError('Expected fill, strict, or ignore')
使用方式如下:
group_size = 3
for x in grouper(as_str_OP(r_arr, g_arr, b_arr), group_size):
write_many_to_disk(x)
Testing out the output
可以很容易地生成一些虚拟输入(r_arr
基本上是red_list
,等等):
def gen_color(n, m):
return np.random.randint(0, 2 ** 8, (n, m))
N, M = 10, 3
r_arr = gen_color(N, M)
g_arr = gen_color(N, M)
b_arr = gen_color(N, M)
并通过消耗发电机产生list
:
res_OP = list(as_str_OP(r_arr, g_arr, b_arr))
res_np = list(as_str_np(r_arr, g_arr, b_arr))
res_nb = list(as_str_nb(r_arr, g_arr, b_arr))
res_nba = list(as_str_nba(r_arr, g_arr, b_arr))
print(np.array(res_OP))
# [['1f6984' '916d98' 'f9d779']
# ['65f895' 'ded23e' '332fdc']
# ['b9e059' 'ce8676' 'cb75e9']
# ['bca0fc' '3289a9' 'cc3d3a']
# ['6bb0be' '07134a' 'c3cf05']
# ['152d5c' 'bac081' 'c59a08']
# ['97efcc' '4c31c0' '957693']
# ['15247e' 'af8f0a' 'ffb89a']
# ['161333' '8f41ce' '187b01']
# ['d811ae' '730b17' 'd2e269']]
print(res_OP == res_np)
# True
print(res_OP == res_nb)
# True
print(res_OP == [x.tolist() for x in res_nba])
# True
最终通过一些分组:
k = 3
res_OP = list(grouper(as_str_OP(r_arr, g_arr, b_arr), k))
res_np = list(grouper(as_str_np(r_arr, g_arr, b_arr), k))
res_nb = list(grouper(as_str_nb(r_arr, g_arr, b_arr), k))
res_nba = list(grouper(as_str_nba(r_arr, g_arr, b_arr), k))
print(np.array(res_OP))
# [[list(['1f6984', '916d98', 'f9d779'])
# list(['65f895', 'ded23e', '332fdc'])
# list(['b9e059', 'ce8676', 'cb75e9'])]
# [list(['bca0fc', '3289a9', 'cc3d3a'])
# list(['6bb0be', '07134a', 'c3cf05'])
# list(['152d5c', 'bac081', 'c59a08'])]
# [list(['97efcc', '4c31c0', '957693'])
# list(['15247e', 'af8f0a', 'ffb89a'])
# list(['161333', '8f41ce', '187b01'])]
# [list(['d811ae', '730b17', 'd2e269']) None None]]
print(res_OP == res_np)
# True
print(res_OP == res_nb)
# True
print(res_OP == [tuple(y.tolist() if y is not None else y for y in x) for x in res_nba])
# True
Benchmarks
为了让您了解我们可能讨论的数字,让我们在更大的输入上使用%timeit
:
N, M = 1000, 1000
r_arr = gen_color(N, M)
g_arr = gen_color(N, M)
b_arr = gen_color(N, M)
%timeit -n 1 -r 1 list(as_str_OP(r_arr, g_arr, b_arr))
# 1 loop, best of 1: 1.1 s per loop
%timeit -n 4 -r 4 list(as_str_np(r_arr, g_arr, b_arr))
# 4 loops, best of 4: 279 ms per loop
%timeit -n 4 -r 4 list(as_str_nb(r_arr, g_arr, b_arr))
# 1 loop, best of 1: 96.5 ms per loop
%timeit -n 4 -r 4 list(as_str_nba(r_arr, g_arr, b_arr))
# 4 loops, best of 4: 10.4 ms per loop
要模拟磁盘写入,我们可以使用以下使用者:
import time
import math
def consumer(gen, timeout_sec=0.001, weight=1):
result = []
for x in gen:
result.append(x)
time.sleep(timeout_sec * weight)
return result
其中,使用time.sleep()
调用模拟磁盘写入,超时取决于对象大小的对数:
N, M = 1000, 1000
r_arr = gen_color(N, M)
g_arr = gen_color(N, M)
b_arr = gen_color(N, M)
%timeit -n 1 -r 1 consumer(as_str_OP(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 2.37 s per loop
%timeit -n 1 -r 1 consumer(as_str_np(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 1.48 s per loop
%timeit -n 1 -r 1 consumer(as_str_nb(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 1.27 s per loop
%timeit -n 1 -r 1 consumer(as_str_nba(r_arr, g_arr, b_arr), weight=math.log2(2))
# 1 loop, best of 1: 1.13 s per loop
k = 100
%timeit -n 1 -r 1 consumer(grouper(as_str_OP(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 1.17 s per loop
%timeit -n 1 -r 1 consumer(grouper(as_str_np(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 368 ms per loop
%timeit -n 1 -r 1 consumer(grouper(as_str_nb(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 173 ms per loop
%timeit -n 1 -r 1 consumer(grouper(as_str_nba(r_arr, g_arr, b_arr), k), weight=math.log2(1 + k))
# 1 loop, best of 1: 87.4 ms per loop
忽略磁盘写入模拟,NumPy矢量化方法在测试输入大小方面快约4倍,而Numba加速方法则快约10倍至numpy.ndarray.tolist()
倍,这取决于是否存在到list()
with numpy.ndarray.tolist()
的潜在无用转换.
在simulated磁盘写入方面,速度更快的版本或多或少都是等效的,并且在没有分组的情况下效率明显降低,从而导致大约2倍的速度提升.
同样,这是在给定输入和测试条件下进行的.