在java中为平台线程池创建单独的线程库

nafvub8i  于 5个月前  发布在  Java
关注(0)|答案(2)|浏览(52)

我想创建一个虚拟线程池,运行在单独的Java线程池上。
以下是我试图创建的架构:
x1c 0d1x的数据
这是为了使我能够创建单独的池来在一个JVM中运行批处理任务,并利用每个池的n:mMap的虚拟线程。因此,如果我有12个核心,那么我可以创建2个6线程的线程池。每个池将只执行特定的任务。每个池将有N个VirtualThreads。因此,这里的Map将是2个{N VirtualThreads -> 6 Platform Threads}的池。
TLDR,我想限制虚拟线程池可以运行的PlatformThreads的数量。
我能想到的一件事是,创建线程池,当传入一个runnable时,在run方法中,我可以只创建虚拟线程,但不确定它有多实用,我是否会得到我想要的池分区。这种方法的另一个问题是,虚拟线程将在一个java线程中运行,所以没有N:MMap。

kninwzqo

kninwzqo1#

Virtual threads的发明是为了避免你似乎正在承担的麻烦。
而且,虚拟线程被明确地记录为 * 不 * 用于池化。像facial tissues一样,抓取一个新的,使用它,然后释放。
阅读Java JEP,并与罗恩Pressler,Alan Bateman,José Paumard等一起观看视频,以了解虚拟线程技术的目的和性质。
你说:
我的用例使用相互依赖的作业(基本上一个作业的输出为队列中的另一个作业提供输入)。
.和:
当传入一个runnable时,在run方法中,我可以创建虚拟线程
把你的想法从里到外:与其创建虚拟线程来运行一堆相关的级联任务,不如只创建一个线程来以串行方式完成所有工作。
如果你有一系列级联任务,每个任务都在前一个任务的结果之后执行,那么只需将所有工作写入一个Runnable/Callable。在一个新的虚拟线程中执行该单个组合任务。让该虚拟线程运行完成。
让我们设计一个简单的演示应用程序。我们有三个任务,TaskATaskBTaskC。它们被放在一起作为AlphabetTask。结果是“ABC”,每个字母都被每个子任务添加。

class AlphabetTask implements Callable < String >
{
    private final UUID id = UUID.randomUUID ( );

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );
        String a = new TaskA ( ).call ( );
        String b = new TaskB ( a ).call ( );
        String c = new TaskC ( b ).call ( );
        System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );
        return c;
    }
}

class TaskA implements Callable < String >
{
    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskA. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return "A";
    }
}

class TaskB implements Callable < String >
{
    private final String input;

    public TaskB ( final String input )
    {
        this.input = input;
    }

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskB. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return this.input + "B";
    }
}

class TaskC implements Callable < String >
{
    private final String input;

    public TaskC ( final String input )
    {
        this.input = input;
    }

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskC. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return this.input + "C";
    }
}

字符串
我们创建了三个 AlphabetTask 的示例。

Collection < AlphabetTask > alphabetTasks =
        List.of (
                new AlphabetTask ( ) ,
                new AlphabetTask ( ) ,
                new AlphabetTask ( )
        );


我们将所有这些示例提交给executor服务。对于这三个AlphabetTasks,executor都会分配一个新的虚拟线程。在每个虚拟线程中,我们的每个子任务都会按顺序调用。
注意,如果executor服务的任务在一天内完成,我们可以使用try-with-resources语法自动关闭它。

List < Future < String > > futures = List.of ( );
try (
        ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
)
{
    try
    {
        futures = executorService.invokeAll ( alphabetTasks );
    } catch ( InterruptedException e )
    {
        throw new RuntimeException ( e );
    }
}
// The try-with-resources blocks here if executor service has any uncompleted tasks.
futures.forEach ( stringFuture -> {
    try
    {
        System.out.println ( stringFuture.get ( ) );
    } catch ( InterruptedException | ExecutionException e )
    {
        throw new RuntimeException ( e );
    }
} );


警告:虚拟线程适用于代码涉及阻塞的任务,如文件I/O,网络I/O,数据库调用,等待锁等。不要使用虚拟线程来运行CPU绑定的任务,如视频编码(不涉及阻塞)。
运行时:

Starting AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c 2023-12-03T20:30:03.442091Z
Starting AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 2023-12-03T20:30:03.442091Z
Starting AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b 2023-12-03T20:30:03.442091Z
Running TaskA. 2023-12-03T20:30:03.452388Z
Running TaskA. 2023-12-03T20:30:03.452392Z
Running TaskA. 2023-12-03T20:30:03.452383Z
Running TaskB. 2023-12-03T20:30:03.556780Z
Running TaskB. 2023-12-03T20:30:03.687342Z
Running TaskC. 2023-12-03T20:30:03.812744Z
Running TaskB. 2023-12-03T20:30:04.108820Z
Running TaskC. 2023-12-03T20:30:04.278596Z
Ending AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b Result: ABC 2023-12-03T20:30:04.310085Z
Running TaskC. 2023-12-03T20:30:04.360861Z
Ending AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c Result: ABC 2023-12-03T20:30:04.624803Z
Ending AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 Result: ABC 2023-12-03T20:30:04.953132Z
ABC
ABC
ABC


注意:当跨线程调用System.out.println时,控制台上的输出可能 * 不 * 按时间顺序出现。如果你关心顺序,请包括并检查时间戳。
为了方便您的复制粘贴,下面是所有这些代码,它们将被放入一个.java文件中。

package work.basil.example.threading;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

public class Subtasks
{
    public static void main ( String[] args )
    {
        Subtasks app = new Subtasks ( );
        app.demo ( );
    }

    private void demo ( )
    {
        Collection < AlphabetTask > alphabetTasks =
                List.of (
                        new AlphabetTask ( ) ,
                        new AlphabetTask ( ) ,
                        new AlphabetTask ( )
                );
        List < Future < String > > futures = List.of ( );
        try (
                ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
        )
        {
            try
            {
                futures = executorService.invokeAll ( alphabetTasks );
            } catch ( InterruptedException e )
            {
                throw new RuntimeException ( e );
            }
        }
        // The try-with-resources blocks here if executor service has any uncompleted tasks.
        futures.forEach ( stringFuture -> {
            try
            {
                System.out.println ( stringFuture.get ( ) );
            } catch ( InterruptedException | ExecutionException e )
            {
                throw new RuntimeException ( e );
            }
        } );
    }
}

class AlphabetTask implements Callable < String >
{
    private final UUID id = UUID.randomUUID ( );

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );
        String a = new TaskA ( ).call ( );
        String b = new TaskB ( a ).call ( );
        String c = new TaskC ( b ).call ( );
        System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );
        return c;
    }
}

class TaskA implements Callable < String >
{
    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskA. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return "A";
    }
}

class TaskB implements Callable < String >
{
    private final String input;

    public TaskB ( final String input )
    {
        this.input = input;
    }

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskB. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return this.input + "B";
    }
}

class TaskC implements Callable < String >
{
    private final String input;

    public TaskC ( final String input )
    {
        this.input = input;
    }

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskC. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return this.input + "C";
    }
}


你说:
所以如果我有12个核心,那么我可以创建6个线程的2个线程。
你并不像你所相信的那样拥有那么多的控制权。哪些平台线程在哪个内核上运行,在什么时间运行多长时间,完全取决于主机操作系统。调度行为根据计算机上的当前负载而随时变化。在任何时候,都可能没有、很少或所有线程正在执行。

p8ekf7hl

p8ekf7hl2#

你的方法不对。
您真正想要做的似乎是 * 限制某些类型任务的并发性 *,也就是说,您最多需要M1个第一类任务(称之为A任务)能够并发运行,第二类任务最多M2(称之为B任务)但是试图创建绑定到平台线程池的虚拟线程池是在错误的级别上解决这个问题。
如果你想限制一个给定活动的并发性,合适的工具是一个信号量。为每种任务A和B创建一个信号量。A任务需要A信号量的许可; B任务需要来自B信号量的许可。用信号量的获取-释放来 Package 任务是微不足道的,所以如果你愿意,你可以把它作为提交逻辑的一部分,而不是任务业务逻辑的一部分。每个任务都在一个虚拟线程中运行。
你可能会想:“但是我可能会有很多虚拟线程在等待A信号量。”是的,您可能会这样做。但是您不在乎;它们很便宜。为每个任务创建一个虚拟线程,并且不要尝试将它们池化。(这也意味着您不需要单独的平台线程池。)

相关问题