我是新的concurrent.futures.下面的脚本有什么问题吗?

def read_excel(self):
    '''
    read excel -> parse df -> store into sql
    '''

    file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
    tpool = ThreadPoolExecutor(max_workers=3)
    ppool = ProcessPoolExecutor(max_workers=3)
    tfutures = list()
    pfutures = list()
    write_futures = list()
    for file in file_list:
        path = os.path.join(self.config['his']['root'],file)
        if os.path.exists(path):
            tfutures.append(tpool.submit(self._read_excel,path,file)) 
    for tfuture in as_completed(tfutures):
        pfutures.append(ppool.submit(self._parse_df,tfuture.result()))
    for pfuture in as_completed(pfutures):
        result = pfuture.result()
        write_futures.append(tpool.submit(self.update_data,result[1],result[0]))
    for _ in as_completed(write_futures):pass
    tpool.shutdown()
    ppool.shutdown()

for循环会阻止代码吗?我的意思是,当第一个read_excel完成时,第一个parse_df会开始工作吗?如果read_excel pool没有完成所有任务,而第一个parse_df已填充,该怎么办?第一次更新_数据会开始工作吗?

希望 playbook 不会被屏蔽.当任务完成后,下一步应该立即开始.

  1. playbook 会造成问题吗?
  2. playbook 符合我的目标吗?如何实现我的目标?

推荐答案

正如László Hunyadi所回应的,在提交给self._read_excel的所有任务完成之前,您当前的代码不会向多处理池提交任何内容.但我会以不同的方式解决这个问题.

大概parse_df方法涉及CPU密集型处理,否则您不会在多处理池中执行该方法.当然,如果处理不够密集,并行运行这些任务所节省的时间将无法弥补使用多处理所带来的额外开销.在这种情况下,您最好使用多线程处理所有事情.但是假设需要在一个单独的进程中执行parse_df个,那么我将基于可以将多处理池传递给多线程池worker函数的 idea ,更简单地组织代码如下.这导致代码更简单,更容易遵循.在下面的代码中,已经创建了一个新的worker方法worker,并且主线程只需要向这个worker提交任务,这个worker完成所有所需的处理:

class SomeClass:

    def worker(self, path, file, ppool):
        """This executes in a multithreading pool."""

        result = self._read_excel(path, file) # Simple function call
        # Run the CPU-intensive processing in the passed multiprocessing pool:
        future = ppool.submit(self._parse_df, result)
        result = future.result()
        # Finally, we continue executing
        self.update_data(result[1], result[0])

    def read_excel(self):
        '''
        read excel -> parse df -> store into sql
        '''

        with ThreadPoolExecutor(max_workers=3) as tpool, \
        ProcessPoolExecutor(max_workers=3) as ppool:
            for path, file in path_file_list:
                tpool.submit(self.worker, path, file, ppool)
            file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
            for file in file_list:
                path = os.path.join(self.config['his']['root'], file)
                if os.path.exists(path):
                    tpool.submit(self.worker, path, file))

        # The implicit calls to shutdown will wait for all
        # submitted tasks to the multithreading pool to complete:
        ...

由于您的代码不是一个完整的、可重复性最小的示例,我提供了一些缺失的方法用于演示目的,以便您了解在实践中如何工作.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

class ExcelProcessor:
    def worker(self, path, file, ppool):
        """This executes in a multithreading pool."""

        result = self._read_excel(path, file) # Simple function call
        # Run the CPU-intensive processing in the passed multiprocessing pool:
        future = ppool.submit(self._parse_df, result)
        result = future.result()
        # Finally, we continue executing
        self.update_data(result[1], result[0])

    def read_excel(self):
        '''
        read excel -> parse df -> store into sql
        '''

        with ThreadPoolExecutor(max_workers=3) as tpool, \
        ProcessPoolExecutor(max_workers=3) as ppool:
            path_file_list = [
                ('path1', 'file1'),
                ('path2', 'file2'),
                ('path3', 'file3'),
                ]
            for path, file in path_file_list:
                tpool.submit(self.worker, path, file, ppool)

        # The implicit calls to shutdown will wait for all
        # submitted tasks to the multithreading pool to complete:
        ...

    def _read_excel(self, path, file):
        return path + file   # Just concatenate the arguments

    def _parse_df(self, path_file):
        """ Return argument and the argument reversed"""
        return path_file, path_file[::-1]

    def update_data(self, s1, s2):
        """Just output the results."""
        print(s1, s2, flush=True)


# Required for Windows:
if __name__ == '__main__':
    ExcelProcessor().read_excel()

打印:

1elif1htap path1file1
2elif2htap path2file2
3elif3htap path3file3

Python相关问答推荐

如何处理嵌套的SON?

返回nxon矩阵的diag元素,而不使用for循环

通过优化空间在Python中的饼图中添加标签

使用SciPy进行曲线匹配未能给出正确的匹配

使用FASTCGI在IIS上运行Django频道

Python daskValue错误:无法识别的区块管理器dask -必须是以下之一:[]

在Python Attrs包中,如何在field_Transformer函数中添加字段?

如何将Docker内部运行的mariadb与主机上Docker外部运行的Python脚本连接起来

Pandas:将多级列名改为一级

从一个系列创建一个Dataframe,特别是如何重命名其中的列(例如:使用NAs/NaN)

删除marplotlib条形图上的底边

判断Python操作:如何从字面上得到所有decorator ?

语法错误:文档. evaluate:表达式不是合法表达式

如何使用Azure Function将xlsb转换为xlsx?

如何使用matplotlib查看并列直方图

在第一次调用时使用不同行为的re. sub的最佳方式

我怎样才能让深度测试在OpenGL中使用Python和PyGame呢?

为什么在更新Pandas 2.x中的列时,数据类型不会更改,而在Pandas 1.x中会更改?

对包含JSON列的DataFrame进行分组

在使用ROLING()获得最大值时,是否可以排除每个窗口中的前n个值?