多线程—如何有效地管理大量java可运行任务

rsl1atfo  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(294)

关闭。这个问题需要更加突出重点。它目前不接受答案。
**想改进这个问题吗?**通过编辑这篇文章更新这个问题,使它只关注一个问题。

24天前关门。
改进这个问题
我有一个数据库表,其中有100k+标识符。任务是读取这些id中的每一个,并开始耗时的充实过程,一旦完成,其结果就需要持久化到数据库中。
当前基于java的解决方案基于以下流程:
将数据库中的所有标识符加载到内存中;
创建10个线程的线程池;
启动并行处理/任务,执行扩展并将数据持久化到数据库中(每个任务50个ID,并行)。正在将所有任务添加到队列中。
javathreadpool和executor框架用于适应此功能。
未来的预期是,标识符的数量将急剧增长,以避免潜在的内存压力-我正在考虑开始批量读取ID,并在现有任务之一完成时(即按需)创建关联的任务。
例如,在开始时,在主线程中,从数据库中获取500个ID,创建10个任务(每个任务50个ID),将它们传递给工作线程来处理它们,并且在第一个任务完成后,从数据库中提取其他ID并创建一个额外的任务。然后重复此过程,直到处理完所有ID。
我的问题是如何通知主线程特定的任务已经完成,以允许主线程从数据库中提取额外的id并创建额外的任务?这是解决这个问题的最佳方法还是另一种架构在这种情况下会更好?

k10s72fa

k10s72fa1#

来自@basil bourque的方法很好,我还有另外一种方法希望能对你有所帮助。
任务进程需要更多的时间,所以不需要将所有ID从数据库加载到内存。
你必须有一个批处理查询方法,每次可以从数据库中获得500个ID。
使用方法调用实现通知主线程。
我写了一个非常简单的代码:

public class BatchTaskExecutor {

    // contains method about query from database
    private final TaskRepository taskRepository;

    private final List<List<Task>> needProcessedTasks = new LinkedList<>();

    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    public BatchTaskExecutor(TaskRepository taskRepository) {
        this.taskRepository = taskRepository;
    }

    public void process() {
        executorService.submit(() -> {
            loadTasks();
            Mono<Boolean> processResult = taskProcess();
            processResult.block();
        });
    }

    // load tasks from database and partition to ten list for parallel process 
    private void loadTasks() {
        List<Task> tasks = taskRepository.batchQuery();

        List<List<Task>> partitionTasks = Lists.partition(tasks, 50);
        needProcessedTasks.clear();
        needProcessedTasks.addAll(partitionTasks);
    }

    // use reactor to parallel process
    private Mono<Boolean> taskProcess() {
        return Flux.range(0, 10)
            .parallel(10)
            .runOn(Schedulers.parallel())
            .flatMap((Function<Integer, Publisher<List<Task>>>) integer -> Mono.just(needProcessedTasks.get(integer)))
            .flatMap((Function<List<Task>, Publisher<Boolean>>) tasks -> Mono.just(batchProcessTask(tasks))).reduce(new BiFunction<Boolean, Boolean, Boolean>() {
            @Override
            public Boolean apply(Boolean aBoolean, Boolean aBoolean2) {
                return aBoolean && aBoolean2;
            }
        }).doOnSuccess(aBoolean -> {
            if (aBoolean) {
                batchSave();
                process();
            } else {
                // todo realize retry method
            }
        });
    }

    // simulate long time need task
    private boolean batchProcessTask(List<Task> tasks) {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException ignore) {
        }
        for (Task task : tasks) {
            task.setStatus(true);
        }
        return true;
    }

    private void batchSave() {
        System.out.println("begin to save task");
        taskRepository.batchSave(needProcessedTasks.stream()
            .flatMap(new Function<List<Task>, Stream<Task>>() {
                @Override
                public Stream<Task> apply(List<Task> tasks) {
                    return tasks.stream();
                }
            }).collect(Collectors.toList()));

        needProcessedTasks.clear();
    }
}
z18hc3ub

z18hc3ub2#

当您向executor服务提交一个可运行/可调用的对象时,您会得到一个 Future 对象。你可以追踪那些 Future 对象,并询问其状态。每个人都将报告是否取消或完成。
全部完成后,向executor服务提交另一批可运行/可调用的任务。
你会负责的 Future -checker使用 ScheduledExecutorService 重复执行重复执行。主线程不直接参与。
除此之外,我建议你检查一下你的假设。显然,您担心示例化数以百万计的可运行/可调用对象,以免耗尽内存。但我怀疑每个可运行/可调用对象 Future 对象,占用大量内存。我建议您运行一个模拟来查看,并使用监视器或分析工具检查内存使用情况。
下面是一些示例代码。我的第一个 Callable .

package work.basil.example;

import java.util.concurrent.Callable;

public record Enrichment(Integer id) implements Callable
{
    @Override
    public Boolean call ( ) throws Exception
    {
        System.out.println( this.toString() );
        return Boolean.TRUE; // Report success.
    }
}

以及一些代码来预订一千万个可运行的示例,并累积每个 Future 提交到执行器服务时生成的对象。

Instant start = Instant.now();
System.out.println( "INFO - Start running demo at " + start );

int limit = 10_000_000;
List < Future > futures = new ArrayList <>( limit );
ExecutorService executorService = null;
try
{
    executorService = Executors.newFixedThreadPool( 3 );
    for ( int i = 1 ; i <= limit ; i++ )
    {
        Callable < Boolean > callable = new Enrichment( i );
        Future < Boolean > future = executorService.submit( callable );
        futures.add( future );
    }
    System.out.println( "INFO - Submitted %d tasks.".formatted( limit ) );
}
finally
{
    if ( Objects.nonNull( executorService ) ) { executorService.shutdown(); }
}

// Sleep our main thread long enough for background work to finish.
try
{
    System.out.println( "INFO - Sleeping main thread." );
    Thread.sleep( TimeUnit.MINUTES.toMillis( 1 ) );
}
catch ( InterruptedException e )
{
    e.printStackTrace();
}

Instant done = Instant.now();
System.out.println( "INFO - Done running demo at " + done );

注意,在这个特殊的 Enrichment 班级。我们可以简单地在所有1000万次执行中重用一个示例 run . 但我想要一个更糟糕的例子——如果你的场景需要新的对象,我想看看对内存的大致影响。
在我使用64位intel mac mini上macos mojave 10.14.6上的adoptopenjdk的java 15进行的试验中,这项工作只花了不到一分钟的时间,并且使用了4.5 Gig来完成1000万个任务。
顺便说一下,将来projectloom可能会简化您的工作。你将能够简单地安排数百万« 虚拟线程 » (在有限数量的平台/内核线程上运行。project loom早期访问版本现在可用。请看youtube上ron pressler在2020年底的演讲。

相关问题