Java线程池,每个客户始终运行一个线程

evrscar2  于 5个月前  发布在  Java
关注(0)|答案(2)|浏览(61)

用途:我有50多个客户,每个客户的活动都记录在一个单独的Excel文件中。每个客户的一个新文件每5分钟添加到一个共享文件系统位置。
在应用程序启动时,我启动一个线程来查询客户记录,并为每个客户启动一个线程。客户的详细信息存储在数据库表中。使用执行器服务线程池并为每个客户提交一个任务。
每个客户线程读取excel文件,并使用多线程并发处理excel中的每一行。每个客户线程为excel文件中的每一行生成一个线程。客户线程在退出之前等待所有子线程完成。
我想在客户执行器服务线程池中实现下面的事情,
1.为每个客户调度一个线程,这将为excel文件中的每一行生成线程,并等待所有子线程完成。
1.在任何时间点,每个客户只能运行一个线程
1.根据Excel中的条目数量,每次运行可能需要不同的时间。
我面临的主要挑战是,如何确保每个客户只有一个客户线程正在运行。
谢谢

luaexgnf

luaexgnf1#

每5分钟向共享文件系统位置添加一个新文件(每个客户)。
每五分钟运行一次调度执行器服务来处理输入。

ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor() ;
Runnable checkForMoreCustomerfilesTask = … ;
ses.scheduleAtFixedRate(
    checkForMoreCustomerfilesTask, 
    0, 
    5, 
    TimeUnit.MINUTES 
);

字符串
请确保在应用结束前“始终”优雅地关闭执行程序服务。否则,线程的后台池可能会无限期地运行,甚至超过应用的结尾-就像僵尸网络一样。请参阅Javadoc上为ExecutorService提供的shutdownAndAwaitTermination样板代码。
确保每个客户只有一个客户线程正在运行
你只有50个客户,所以你可以同时运行50个executor服务对象。根据工作负载,这可能是一个合理的负担,特别是如果不是所有客户都在处理和/或如果你有一个多核机器。
使每个执行器服务成为单线程的。使用单线程,您可以为每个客户排队多个任务,但确保每个客户只有一个任务在执行。

Map< UUID , ExecutorService > executorPerCustomerMap = new HashMap<>() ;
for ( UUID customerId : customerIds ) {
    executorPerCustomerMap.put( customerId , Executors.newSingleThreadExecutor() ) ;
}


再次强调..
当您实现上面所示的Runnable checkForMoreCustomerfilesTask时,从该Map中检索适当的以客户为中心的执行器服务,并提交一个Runnable来处理该文件。

// Found file for a particular customer.
ExecutorService customerExecutorSerivce = executorPerCustomerMap.get( customerId ) ;
customerExecutorSerivce.submit( new CustomerFileProcessorTask( file ) ) ;


你说每个客户文件都有一堆可以并发处理的行。我建议你学习Java 21+中的虚拟线程。你可以合理地同时运行数千个,甚至数百万个虚拟线程,因为它们是如此轻量级。阅读Java JEP 444。观看YouTube.com上的精彩演讲等。由Alan BatemanRon PresslerJosé Paumard
如果您的电子表格行处理工作涉及阻塞(基本上是任何I/O,如网络调用,文件访问,日志记录,数据库调用等),那么您应该使用虚拟线程。使用虚拟线程的一个问题是,如果您的任务代码中有任何标记为synchronized的长时间运行代码,请考虑将synchronized替换为RentrantLock以获得最佳性能。
在现代Java中,executor服务是AutoCloseable。因此,您可以方便安全地使用try-with-resources语法在所有提交的任务完成后自动关闭。

try (
    ExecutorService rowsProcessingExecutorService = Executors.newVirtualThreadPerTaskExecutor() ;
) {
    for( Row row : rows ) {
        rowsProcessingExecutorService.submit( new RowProcessingTask( row ) ) ;
    }
}
// Code flow blocks here until all submitted tasks have finished or failed.


最后,考虑删除中间部分,即每个客户ID分配的执行器服务Map。考虑到您可以使用虚拟线程执行突发任务,您可能实际上不需要并发运行客户文件。您可能希望连续处理每个找到的客户文件,一次一个文件,但该文件中的所有行都通过虚拟线程并发处理。在这种方法中,您可以将每个文件作为一个任务提供给单线程执行器服务,以便每次只运行一个文件,从而不会同时处理两个相同的客户。

ztyzrc3y

ztyzrc3y2#

您必须使用ForkJoinPool API,它针对递归操作进行了优化。这将允许您在处理客户的主线程下递归调用多个线程。
通过使用前面的方法,您可以通过为每个客户同步创建线程来完成第2项要求。

相关问题