我用concurrent.futures.ProcessPoolExecutor从一个数字范围中找出一个数字的出现.其目的是调查从并发性中获得的加速性能.为了测试性能,我有一个控件——一个执行上述任务的串行代码(如下所示).我已经编写了两个并发代码,一个使用concurrent.futures.ProcessPoolExecutor.submit(),另一个使用concurrent.futures.ProcessPoolExecutor.map()来执行相同的任务.它们如下所示.关于起草前者和后者的建议分别见第herehere页.

分配给所有三个代码的任务是查找数字5在0到1E8范围内的出现次数..submit()人和.map()人都被分配了6名工人,.map()人的区块大小为.submit()00.在并发代码中,离散化工作负载的方式是相同的.然而,用于查找两个代码中出现的情况的函数是不同的.这是因为参数传递给.submit().map()调用的函数的方式不同.

所有3个代码报告的发生次数相同,即56953279次.然而,完成这项任务所需的时间却大不相同..submit()的执行速度是对照组的2倍,而.map()完成任务的时间是对照组的两倍.

Questions:

  1. 我想知道.map()的缓慢性能是我的代码造成的还是天生的缓慢?"如果是前者,我该如何改进它.我只是惊讶于它的表现比控制慢,因为没有太多动力使用它.
  2. 我想知道是否有办法让.submit()个代码执行得更快.我有一个条件,函数_concurrent_submit()必须返回一个iterable,其中的数字/出现次数包含数字5.

Benchmark Results
benchmark results

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

Serial Code:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found {0} in {1:.4f}sec".format(len(a),end))

Update 13th Feb 2017:

除了@niemmi-answer,我还提供了以下个人研究的答案:

  1. 如何进一步加快@niemmi的.map().submit()解决方案,以及
  2. ProcessPoolExecutor.map()可以导致比ProcessPoolExecutor.submit()更快的速度.

推荐答案

Overview:

我的答案有两部分:

  • 第1部分展示了如何从@niemmi的ProcessPoolExecutor.map()解决方案中获得更快的速度
  • 第2部分展示了ProcessPoolExecutor的子类.submit().map()何时产生非等效的计算时间.

=======================================================================

Part 1: More Speed-up for ProcessPoolExecutor.map()

Background:

我认为@niemmi对chunk = nmax // workers的定义是chunksize的定义,即工作者池中每个工作者要处理的实际数字范围(给定任务)的较小值.现在,这个定义是基于这样一个假设:如果一台计算机有x个工作人员,那么在每个工作人员之间平均分配任务将使每个工作人员得到最佳利用,因此整个任务将以最快的速度完成.因此,将给定任务分解成的块的数量应始终等于池工作者的数量.然而,这个假设正确吗?

Proposition:在这里,我建议,当与ProcessPoolExecutor.map()一起使用时,上述假设并不总是导致最快的计算时间.而是discretising a task to an amount greater than the number of pool workers can lead to speed-up, i.e. faster completion of a given task.

Experiment:我修改了@niemmi的代码,允许离散化任务的数量超过池工作人员的数量.下面给出了该代码,用于确定数字5在0到1E8的数字范围内出现的次数.我已经使用1、2、4和6个池工作者以及离散化任务数量与池工作者数量的不同比率执行了这段代码.对于每个场景,进行3次运行,并将计算时间制成表格."Speed-up"在这里被定义为当离散化任务的数量大于池工作者的数量时,在平均计算时间内使用相同数量的块和池工作者的平均计算时间.

Findings:

nchunk over nworkers

  1. Figure on left shows the compute time taken by all the scenarios mentioned in the experiment section. It shows that the compute time taken by number of chunks / number of workers = 1 is always greater than the compute time taken by number of chunks > number of workers. That is, the former case is always less efficient than the latter.

  2. 右图显示的是a speed-up of 1.2 times or more was gained when the number of chunks / number of workers reach a threshold value of 14 or more.有趣的是,当一名工人执行ProcessPoolExecutor.map()次时,也出现了加速趋势.

定制ProcessPoolExecutor处理的离散任务数时为Conclusion:.map()`应该用来解决给定的任务,谨慎的做法是确保这个数字大于池工作者的数量,因为这种做法缩短了计算时间.

concurrent.futures.ProcessPoolExecutor.map() code. (revised parts only)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('\n within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

=======================================================================

Part 2: Total compute time from using ProcessPoolExecutor subclasses .submit() and .map() can be dissimilar when returning a sorted/ordered result list.

Background:我修改了.submit().map()代码,允许对它们的计算时间进行"苹果对苹果"的比较,并能够可视化主代码的计算时间,主代码调用的_concurrent方法的计算时间,以执行并发操作,以及_concurrent方法调用的每个离散化任务/工作者的计算时间.此外,这些代码中的并发方法的 struct 是直接从future 的对象.submit()和迭代器.map()返回结果的无序和有序列表.下面提供了源代码(Hope it helps you.).

Experiments这两个新改进的代码用于执行第1部分中描述的相同实验,只是只考虑了6个池工作者,并且使用python内置的listsorted方法分别将无序和有序的结果列表返回到代码的主要部分.

Findings:

  1. 从_concurrent方法的结果中,我们可以看到_concurrent方法用于创建ProcessPoolExecutor.submit()的所有future 对象的计算时间,以及创建ProcessPoolExecutor.map()的迭代器的计算时间,作为离散化任务数量与池工作者数量的函数,是等价的.这个结果仅仅意味着ProcessPoolExecutor个子类.submit().map()同样高效/快速.
  2. 比较main和它的_并发方法的计算时间,我们可以看到main比它的_并发方法运行的时间更长.这是意料之中的,因为它们的时差反映了listsorted种方法(以及这些方法中包含的其他方法)的计算时间.显然,list方法返回结果列表所需的计算时间比sorted方法少.这两种方法的平均计算时间均为list次.提交()和.map()代码类似,大约为0.47秒.排序方法的平均计算时间.提交()和.map()代码分别为1.23秒和1.01秒.换言之,list法的测试速度是sorted法的2.62倍和2.15倍.提交()和.map()代码.
  3. 目前尚不清楚为什么sorted方法会从
  4. 在我的答案的第一部分中提到的离散化方案显示在这里,以加快.submit().map()子类的性能.当离散化任务的数量与池工作人员的数量相等时,速度可以高达20%.

Improved .map() code

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found {0} in {1:.4f}sec".format(len(found),end))    

Improved .submit() code.

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

=======================================================================

Python-3.x相关问答推荐

如何使用Python将嵌套的XML转换为CSV

为什么我无法在django中按月筛选事件?

查找值始终为零的行 pandas

PyQt5 中耦合滑块和拨号小部件.解决结果不一致的问题

Python webdrivermanager 和 Chrome 115.0 的 URL https://chromedriver.storage.googleapis.com/LATEST_RELEASE_115.0.5790 错误没有此类驱动程序

txt 文件与不同的分隔符到整数列表

当我判断另一个 checkButton 时,如何判断两个 python tkinter checkButtons?

考虑到Pandas 系列中的不同索引,如何正确估计两列的百分比变化? Python相关

XPATH:使用 .find_elements_by_xpath 为未知数量的 xpath 输入值

找到操作系统的图片文件夹的 CLI

spaCy 中的匹配模式返回空结果

过滤查询集和Q运算符的不同值

如何在带有 GUI 的 python 游戏中设置回答时间限制?

如何将虚拟变量列转换为多列?

如何在 django 中没有循环的情况下获得前键的前键?

参数化泛型不能与类或实例判断一起使用

在python中基于列表理解的条件下跳过元素

virtualenv virtualenvwrapper virtualenv:错误:无法识别的参数:--no-site-packages

如何用pymongo连接远程mongodb

Python pathlib 获取父级相对路径