正如László Hunyadi所回应的,在提交给self._read_excel
的所有任务完成之前,您当前的代码不会向多处理池提交任何内容.但我会以不同的方式解决这个问题.
大概parse_df
方法涉及CPU密集型处理,否则您不会在多处理池中执行该方法.当然,如果处理不够密集,并行运行这些任务所节省的时间将无法弥补使用多处理所带来的额外开销.在这种情况下,您最好使用多线程处理所有事情.但是假设需要在一个单独的进程中执行parse_df
个,那么我将基于可以将多处理池传递给多线程池worker函数的 idea ,更简单地组织代码如下.这导致代码更简单,更容易遵循.在下面的代码中,已经创建了一个新的worker方法worker
,并且主线程只需要向这个worker提交任务,这个worker完成所有所需的处理:
class SomeClass:
def worker(self, path, file, ppool):
"""This executes in a multithreading pool."""
result = self._read_excel(path, file) # Simple function call
# Run the CPU-intensive processing in the passed multiprocessing pool:
future = ppool.submit(self._parse_df, result)
result = future.result()
# Finally, we continue executing
self.update_data(result[1], result[0])
def read_excel(self):
'''
read excel -> parse df -> store into sql
'''
with ThreadPoolExecutor(max_workers=3) as tpool, \
ProcessPoolExecutor(max_workers=3) as ppool:
for path, file in path_file_list:
tpool.submit(self.worker, path, file, ppool)
file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
for file in file_list:
path = os.path.join(self.config['his']['root'], file)
if os.path.exists(path):
tpool.submit(self.worker, path, file))
# The implicit calls to shutdown will wait for all
# submitted tasks to the multithreading pool to complete:
...
由于您的代码不是一个完整的、可重复性最小的示例,我提供了一些缺失的方法用于演示目的,以便您了解在实践中如何工作.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ExcelProcessor:
def worker(self, path, file, ppool):
"""This executes in a multithreading pool."""
result = self._read_excel(path, file) # Simple function call
# Run the CPU-intensive processing in the passed multiprocessing pool:
future = ppool.submit(self._parse_df, result)
result = future.result()
# Finally, we continue executing
self.update_data(result[1], result[0])
def read_excel(self):
'''
read excel -> parse df -> store into sql
'''
with ThreadPoolExecutor(max_workers=3) as tpool, \
ProcessPoolExecutor(max_workers=3) as ppool:
path_file_list = [
('path1', 'file1'),
('path2', 'file2'),
('path3', 'file3'),
]
for path, file in path_file_list:
tpool.submit(self.worker, path, file, ppool)
# The implicit calls to shutdown will wait for all
# submitted tasks to the multithreading pool to complete:
...
def _read_excel(self, path, file):
return path + file # Just concatenate the arguments
def _parse_df(self, path_file):
""" Return argument and the argument reversed"""
return path_file, path_file[::-1]
def update_data(self, s1, s2):
"""Just output the results."""
print(s1, s2, flush=True)
# Required for Windows:
if __name__ == '__main__':
ExcelProcessor().read_excel()
打印:
1elif1htap path1file1
2elif2htap path2file2
3elif3htap path3file3