当我处理低延迟需求时,我try 使用多处理池,每个进程并行运行,在我的示例中需要1秒.但整个过程是额外的300ms,即1秒300ms.

为什么要增加额外的时间?有没有更好的方法来降低吞吐量?

我提到了这个solution,它具有最大2ms(平均值为0.1ms)的吞吐量,但不适用于我的代码(即,使用PANDA READ_SQL从数据库获取数据)

我假设会有更好的方法来减少额外的管理时间.

Code:

import multiprocessing as mp
from multiprocessing import Pool

def test_func(x):
    ts = time.time()
    time.sleep(1) # In my case `pd.read_sql(query, conn_str)` which takes around 300ms
    print(time.time() - ts)
    return x

ts = time.time()

num_process = mp.cpu_count()*2

with Pool(num_process) as pool:
    results = pool.map(test_func, [1, 2, 3])
    print("*********")
    print(time.time() - ts)


print("*********")
print(time.time() - ts)

Output:

1.0009253025054932 # each process taken time
1.0011048316955566
1.0010230541229248


*********
1.2178916931152344 # time taken inside pool context manager
*********
1.2484805583953857 # total time 

可用进程数:

mp.cpu_count()*2 --> 32

有没有可能使总时间更接近1秒?若有,如何处理?

推荐答案

一般来说,多处理更适合于CPU密集型工作,而多线程更适合于I/O受限的工作.

这是一个巨大的概括.您实际需要的内容将取决于代码的各种复杂性.

future 模块有两个非常有用的类,即:ThreadPoolExecutor和ProcessPoolExecutor.它们的属性是相同的,这意味着您可以在开发过程中轻松地从一个切换到另一个,只需更改类名,而不需要更改任何其他代码.

下面是一些代码,它们为您提供了一个框架,用于确定哪些代码最适合您的需求.只需适当地实现do_work()函数.

代码将执行process()函数NVALS次,并为您提供多线程和多处理的平均持续时间.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import perf_counter
from collections.abc import Callable, Iterator
import os

NVALS = 10

if (CPU_COUNT := os.cpu_count()) is None or CPU_COUNT < 2:
    CPU_COUNT = 2

POOLSIZE = min((CPU_COUNT//2, NVALS))
DP = 8

def do_work(i: int, s:str) -> tuple[int, str]:
    print(i, s)
    return i, s

def process(t: tuple[int, str]) -> float:
    _start = perf_counter()
    do_work(*t)
    return perf_counter() - _start

def values(nvals: int) -> Iterator[tuple[int, str]]:
    for i in range(nvals):
        yield i, 'a'
    
def main(executor: Callable):
    with executor(POOLSIZE) as _executor:
        print(executor.__name__)
        # create a list of tuples
        _sum = sum(_executor.map(process, values(NVALS)))
        print(f'Average={_sum/NVALS:.{DP}f}\n')



if __name__ == '__main__':
    for executor in ProcessPoolExecutor, ThreadPoolExecutor:
        main(executor)

Python相关问答推荐

在matplotlib动画gif中更改配色方案

如何在Python中使用时区夏令时获取任何给定本地时间的纪元值?

jit JAX函数中的迭代器

更改matplotlib彩色条的字体并勾选标签?

重新匹配{ }中包含的文本,其中文本可能包含{{var}

Gekko:Spring-Mass系统的参数识别

如何制作10,000年及以后的日期时间对象?

OR—Tools中CP—SAT求解器的IntVar设置值

如何将多进程池声明为变量并将其导入到另一个Python文件

Pre—Commit MyPy无法禁用非错误消息

Stacked bar chart from billrame

将pandas导出到CSV数据,但在此之前,将日期按最小到最大排序

如何更改groupby作用域以找到满足掩码条件的第一个值?

关于两个表达式的区别

如何在海上配对图中使某些标记周围的黑色边框

如何在Python Pandas中填充外部连接后的列中填充DDL值

Tensorflow tokenizer问题.num_words到底做了什么?

有没有办法让Re.Sub报告它所做的每一次替换?

PySpark:如何最有效地读取不同列位置的多个CSV文件

如何在Django模板中显示串行化器错误