partitioner多线程处理

sqougxex  于 2021-07-09  发布在  Java
关注(0)|答案(2)|浏览(363)

我有一个包含超过1m个xml文件的文件夹和一个单线程步骤,该步骤以相同的方式处理这些xml文件中的每一个(没有连接到数据库或文件之间的任何共同点)。
有没有办法让这一步更加并行,比如使用一系列文件名进行分区,或者将文件拆分到不同的文件夹并使用文件夹名?
据我所知,multiresourcepartitioner无法处理这种情况,因为它
为每个资源创建executioncontext,并将它们标记为{partition0,partition1,…,partitionn}。网格大小将被忽略。

9udxz4iz

9udxz4iz1#

既然已经有了单独的文件,为什么需要分组来提高并发性。如果需要增加并发性,请增加线程数。在线程执行器中。假设您有1000个文件,并且有内存和cpu,您可以将max thread设置为50。因此,一次将处理50个文件。一旦文件被处理,它将采取下一组50个文件。因此执行是并行的。下面是一个例子。

<bean id="kpThreadPool"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
    destroy-method="destroy">
    <property name="maxPoolSize" value="${app.max_thread_num}" />
</bean>

<batch:step id="kp.step1" next="kp.step2">
        <batch:partition step="kp.slave"
            partitioner="multiResourcePartitioner">
            <batch:handler task-executor="kpThreadPool" />
        </batch:partition>
</batch:step>

其中app.max\u thread\u num=50

nkcskrwz

nkcskrwz2#

经过一些修补,最好的结果来自自定义分区器,它基于文件夹创建分区。为了实现这一点,上一步编写了每100kXML文件的文件夹数。
分区器(multiresource partitioner)的代码对如何管理stepexecutions有很大帮助:

public class FolderPartitioner implements Partitioner {

    private static final Logger logger = LoggerFactory.getLogger(FolderPartitioner.class);

    private static final String DEFAULT_KEY_NAME = "fileName";

    private static final String PARTITION_KEY = "partition";

    private String folder;

    private String keyName = DEFAULT_KEY_NAME;

    /**
     * Map each partition to a subfolder of the folder property
     * {@link ExecutionContext}.
     * 
     */
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(
                gridSize);
        int i = 0;
        File dir = new File(folder);
        File[] chunkList = dir.listFiles();
        for (File chunkStep : chunkList) {
            if (chunkStep.isDirectory()) {

                ExecutionContext context = new ExecutionContext();
                context.putString(keyName, chunkStep.getName());
                logger.info("Creating partition for folder:" + context.getString(keyName));
                map.put(PARTITION_KEY + i, context);
                i++;
            }
        }
        return map;
    }

    /**
     * The name of the key for the file name in each {@link ExecutionContext}.
     * Defaults to "fileName".
     * 
     * @param keyName
     *            the value of the key
     */
    public void setKeyName(String keyName) {
        this.keyName = keyName;
    }

    public String getFolder() {
        return folder;
    }

    /**
     * The name of the folder which contains the subfolders for spliting them to steps
     * 
     * @param folder
     */
    public void setFolder(String folder) {
        this.folder = folder;
    }

}

执行时间从2小时变为40分钟(!!)使用这个分区器。

相关问题