多处理-迭代循环遍历文件夹中的文件

35g0bw71  于 2021-08-20  发布在  Java
关注(0)|答案(1)|浏览(303)

我必须循环浏览30个zip文件夹,每个zip文件夹有50000-90000个jpeg文件。理想情况下,我会遍历每个zip文件夹,因为解压缩每个文件夹会花费太长时间。对于每个文件,我需要打开每个文件,从中提取关键信息,并将信息存储到列表中。基于如何在包含多个文件的文件夹上执行多线程处理,我尝试启用多线程处理以使事情更快,但是,我无法理解。在下面的示例中,我正在尝试让它在一个文件夹中工作,然后我需要弄清楚如何使它在所有30个zip文件夹中循环。

import os
from zipfile import ZipFile

data_list = []

def image_processor(file):
    with ZipFile("files101.zip") as zip_file:
        with zip_file.open(file, "r") as img_file:
            img_data = img_file.readlines(1) # data is available in beginning of each file

            # Extract data #1
            pattern_1 = r'IMG:\d{,3}'
            if re.findall(pattern_1, str(img_data)):
                img_extract = re.findall(pattern_1, str(img_data))[0]
            else:
                img_extract = np.nan

            # Extract timestamp
            time_pattern = r'Time:\s\d{2}-\d{2}-\d{4}\s\s\d{2}:\d{2}:\d{2}'
            if re.findall(time_pattern, str(img_data)):
                time_extract = re.findall(time_pattern, str(img_data))[0]
            else:
                time_extract = np.nan

            # Create list   
            return data_list.append([img_extract, time_extract])

os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
for folder in os.listdir():
    file_list = ZipFile("files101.zip", "r").namelist()

    with ProcessPool(processes=8) as pool:
        pool.map(image_processor, file_list)

所发生的是,我的代码只是永远运行,就像它没有启用多处理一样。如果我需要多线程,我有六个内核。如有任何建议,将不胜感激。

iq0todco

iq0todco1#

您缺少几个导入项。但我马上注意到的主要项目有: for folder in os.listdir(): 您正在当前目录中的每个文件和目录上循环,但循环没有引用任何这些文件/目录;您正在重复处理files101.zip。
根据您的配置,您似乎正在windows下运行 chdir 指挥部。创建新流程的代码必须在 if __name__ == '__main__':
处理池中的每个进程都有自己的地址空间,并将附加到 data_list . 返回 data_list 返回到主进程,让主进程将所有返回值附加到主列表中是可行的,但必须确保 image_processor 函数以一个空的 data_list 每一个电话。
我假设您希望处理当前目录中所有扩展名为.zip的文件(其中大约有30个)。我将修改您的处理过程,使提交到处理池工作单元的每个任务不是zip归档中的单个文件(在这种情况下,您将提交30*50000个任务),而是整个归档。因此,您的主要处理功能不再是 image_processor 而是 zip_processor . 我对该文件做了一些其他更改,这些更改对我来说是有意义的(我希望我没有破坏任何内容):

import os
import glob
from zipfile import ZipFile
import re
import numpy as np
from multiprocessing import Pool

def zip_processor(zipfile):
    with ZipFile(zipfile) as zip_file:
        data_list = []
        for file in zip_file.namelist():
            with zip_file.open(file, "r") as img_file:
                # take element 0 from returned list and convert to a string
                img_data = img_file.readlines(1)[0].decode() # data is available in beginning of each file
                # Extract data #1
                pattern_1 = r'IMG:\d{,3}'
                # why do findall if you are only using the first occurrence?
                m = re.search(pattern1, img_data)
                img_extract = m.group(0) if m else np.nan

                # Extract timestamp
                time_pattern = r'Time:\s\d{2}-\d{2}-\d{4}\s\s\d{2}:\d{2}:\d{2}'
                m = re.search(time_pattern, img_data)
                time_extract = m.group(0) if m else np.nan

                # Create list   
                data_list.append([img_extract, time_extract])
        return data_list

# required for Windows:

if __name__ == '__main__':
    os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")

    # Default pool size:
    with Pool() as pool:
        results = pool.imap(zip_processor, glob.iglob('*.zip'))
        data_list = []
        for result in results:
            data_list.extend(result)

现在,由于涉及到大量i/o,使用多线程可能会运行得很好,在这种情况下,更大的池大小将是有利的。进行以下更改:


# from multiprocessing import Pool

from multiprocessing.pool import ThreadPool
... # etc.

if __name__ == '__main__':
    os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")

    zip_list = glob.glob('*.zip')
    with ThreadPool(len(zip_list)) as pool:
        results = pool.imap(zip_processor, zip_list)
        ... # etc.

相关问题