Spring Boot 当我发现某些内容时如何停止ExecutorService

kulphzqa  于 7个月前  发布在  Spring
关注(0)|答案(1)|浏览(81)

我正在使用Java 21和Sping Boot 3.2.0(SNAPSHOT)。我有更多的类,但我只显示这种情况下的基本类。我试图使用ExecutorService在多线程环境中检查一些记录,并根据检查结果继续或停止ExecutorService。
SecurityCheckc类:

private ExecutorService pool;
    private final SendToDatabse sendToDatabse;
    private final UserCheck userCheck;
    private final SynchronisedCheckedMap synchronisedCheckedMap;

    public SecurityCheck(SendToDatabse sendToDatabse,
                         UserCheck userCheck,
                         SynchronisedCheckedMap synchronisedCheckedMap) {
        this.sendToDatabse = sendToDatabse;
        this.userCheck = userCheck;
        this.synchronisedCheckedMap = synchronisedCheckedMap;

    }

    public void securityCheck(List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
        List<Future<?>> futures = new ArrayList<>();
        this.pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        try {
            list.forEach(line -> futures.add(pool.submit(
                    new Security(line,
                                this.userCheck,
                                this.synchronisedCheckedMap
                            ))));

            for (Future<?> future : futures) {
                future.get(Long.MAX_VALUE, TimeUnit.SECONDS);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
        finally {
            print();
            pool.shutdown();
            while (!pool.awaitTermination(24L,TimeUnit.SECONDS)){
                System.out.println("Not yet. Still in waiting for termination");
            }
//            this.sendToDatabse.sendToDatabse(synchronisedCheckedMap.synchronisedCheckedMap);
        }
    }

    public void stopPool(Short code, Customer customer){

        System.out.println("oprire");
        Map<Short, List<Customer>> map = new HashMap<>();
        map.put(code, (List<Customer>) customer);
        map.forEach((k,v)->{
            System.out.println(k + "\t" + v);
        });
        this.pool.shutdownNow();
//        this.sendToDatabse.sendToDatabse(map);

        Thread.currentThread().interrupt();
    }

字符串
安全类:

public class Security implements SQLInjections, Runnable{
    private final String line;
    private final UserCheck userCheck;
    private final SynchronisedCheckedMap synchronisedCheckedMap;
    public Security(String line,
                    UserCheck userCheck,
                    SynchronisedCheckedMap synchronisedCheckedMap){
        this.line = line;
        this.userCheck = userCheck;
        this.synchronisedCheckedMap = synchronisedCheckedMap;
    }

    @Override
    public void run() {
        try {
            if (Thread.interrupted()) {
                return;
            }

            JsonToLine jsonToLine = new JsonToLine();
            String newLine = jsonToLine.jsonToLine(line);

            StringToValues stringToValues = new StringToValues();
            List<String> values = stringToValues.stringToValues(newLine);
//        values.forEach(string -> {System.out.println("line: " + string);});
            Short validID = validID(values.get(0));
            Short validUser = null;
            Short validPassword = null;
            Short validMessage = null;
            List<String> message = new ArrayList<>();
            Customer customer;

            switch (validID) {
                case 200:
//                    values.forEach(string -> {System.out.println("code" + "\t" + "line: " + string);});
                    validUser = validUser(values.get(1));
                    validPassword = validPassword(values.get(2));
                    message = Arrays.asList(values.get(3).split(";"));
                    validMessage = validMesage(message);
                    customer =
                            new Customer(Long.parseLong(values.get(0)),
                                    values.get(1),
                                    values.get(2),
                                    message);

                    if(validUser==200 &&
                        validMessage==200 &&
                        validPassword==200){

                        this.synchronisedCheckedMap.synchronisedCheckedMap((short) 200, customer);

                    }else{

                        this.synchronisedCheckedMap.synchronisedCheckedMap(
                                getHighestCode(List.of(validUser,validID,validMessage,validPassword)), customer);
                    }

                    break;

                case 600:

                    validUser = validUser(values.get(0));
                    validPassword = validPassword(values.get(1));
                    message = Arrays.asList(values.get(3).split(";"));
                    validMessage = validMesage(message);
                    customer =
                            new Customer(null,
                                    values.get(1),
                                    values.get(2),
                                    message);

                    this.synchronisedCheckedMap.synchronisedCheckedMap((short) 600, customer);
                    break;
            }

        }catch (Exception e){
            Thread.currentThread().interrupt(); // Preserve interruption status
            return;
        }

    }
}


我还有SyncroniseMap

@Component
public class SynchronisedCheckedMap {

    public Map<Short, List<Customer>> synchronisedCheckedMap = new HashMap<>();
    private  SecurityCheck securityCheck;

    public SynchronisedCheckedMap(SecurityCheck securityCheck) {
        this.securityCheck = securityCheck;
    }

    public SynchronisedCheckedMap() {}

    protected synchronized void synchronisedCheckedMap(Short code,
                                                       Customer customer){

        if(code>=800){
            this.securityCheck.stopPool(code,customer);
        }

        if(this.synchronisedCheckedMap.containsKey(code)){
            this.synchronisedCheckedMap.get(code).add(customer);
        }else {
            List<Customer> list = new ArrayList<>();
            list.add(customer);
            synchronisedCheckedMap.put(code,list);
        }
    }

    public void print(){
        this.synchronisedCheckedMap.forEach((k,v)->{
            System.out.println(k + "\t" + v);
        });
    }

}


我把这个类中的依赖注入搞砸了,因为它根本不工作。线程仍然在运行。但是我试图在“stopPool”方法上放置调试器,并且没有停止。
我知道我将不得不在这些方面做更多的工作,但我想从SecurityCheck类中停止ExecutorService。当它是一个发送到“this. synchronisedBikedMap. synchronisedBikedMap”的特定代码时,我试图调用“stopPool”方法。
我已经尝试过这样做,但我需要帮助才能使它工作,因为线程继续下去。
在这种情况下,我应该如何正确地停止ExecutorService?

cmssoen2

cmssoen21#

私有ExecutorService池;
关于这个名字,不要把你的executor服务看作是一个线程池。
一个executor service * 可以 * 由一个线程池支持。或者它可以由单个线程支持。或者它可以没有线程支持,可以在当前线程上执行它继承的execute方法。在Java 21+中,执行器服务可以为每个提交的任务创建一个新的virtual thread,没有池。
因此,可以将执行器服务看作是代表您以某种方式执行代码的服务。
我有更多的类,但我只显示了这种情况下的基本类。
你展示了太多的代码,没有缩小到你的特定问题的焦点。所以我可以给予一些提示和建议来指导你。但是如果你想要一个直接的答案,你需要提供一个直接的问题。
我正试图在多线程环境中检查一些记录
当你有一堆任务要执行时,你可以通过invokeAll将它们一次性提交给你的执行器服务。这只是一个方便,与一次提交一个任务的效果相同。
看起来你只是有一个单独处理的文本行列表。你似乎试图通过停止executor服务来停止任务,这可能很笨拙。线程不能直接停止。在Java中,线程是通过设计你的代码来测试中断状态或其他条件来合作停止的。Search了解更多信息。
让我们定义一个简单的示例任务类。
如果你想从每个任务返回一个结果,使用Callable而不是Runnable。在这种情况下,我们返回一个Boolean对象来指示我们的行是否成功处理。

package work.basil.example;

import java.time.Instant;
import java.util.concurrent.Callable;

public class LineProcessor implements Callable < Boolean > {
    private final String line;

    public LineProcessor ( final String line ) {
        this.line = line;
    }

    @Override
    public Boolean call ( ) {
        System.out.println ( "In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " processing line: " + this.line );
        Boolean success = Boolean.TRUE;  // If we successfully processed this line.
        return success;
    }
}

字符串
ExecutorService现在是AutoCloseable。因此,您可以使用try-with-resources语法在执行程序服务完成一批任务时自动关闭它。
在这里,我们模拟获取一个行列表。我们将每个行发送到LineProcessor任务Callable类的构造函数。我们将这些构造的任务对象传递给executor服务。我们得到一个Future对象的列表。通过使用try-with-resources语法,我们的代码阻塞,直到executor服务执行完所有任务或在一天后过期。

List < String > lines = List.of ( "x" , "y" , "z" );
List < LineProcessor > lineProcessors = lines.stream ( ).map ( LineProcessor :: new ).toList ( );
List < Future < Boolean > > futures = null;
try (
        ExecutorService executorService = Executors.newFixedThreadPool ( Runtime.getRuntime ( ).availableProcessors ( ) ) ;
) {
    futures = executorService.invokeAll ( lineProcessors );
} catch ( InterruptedException e ) {
    throw new RuntimeException ( e );
}
// At this point, the executor service has closed after all its tasks are done or canceled.


如果我们想检查结果,我们可以添加更多的代码来处理Future对象。

for ( Future < Boolean > future : futures ) {
    if ( future.isDone ( ) ) {
        System.out.println ( "done" );
    } else if ( future.isCancelled ( ) ) {
        System.out.println ( "cancelled" );
    }
    System.out.println ( "Call future.get here to get result" );
}
System.out.println ( "INFO - Demo done. " + Instant.now ( ) );


运行时:

In thread ID: 22 at 2023-11-01T05:56:56.499449Z processing line: y
In thread ID: 23 at 2023-11-01T05:56:56.499442Z processing line: z
In thread ID: 21 at 2023-11-01T05:56:56.499460Z processing line: x
done
Call future.get here to get result
done
Call future.get here to get result
done
Call future.get here to get result
INFO - Demo done. 2023-11-01T05:56:56.512539Z


回到你的问题的要点,它似乎是问如果其中一个任务遇到特定的情况,如何中断兄弟任务。
在Java中,线程是协同中断的。你可以要求executor服务提前关闭,它会尝试中断它的每个任务的线程。但是中断处理有点棘手。我建议你设置一个特定的标志,让所有的任务共享。这个标志必须是线程-安全。所以我将使用AtomicBoolean作为任务之间的标志,表明一切正常,它们应该继续工作。让我们初始化该标志,并在构造任务对象时将其作为参数传递。

List < String > lines = List.of ( "x" , "y" , "z" );
AtomicBoolean flagToContinueProcessingLines = new AtomicBoolean ( true );
List < LineProcessor > lineProcessors =
        lines
                .stream ( )
                .map ( ( String line ) -> new LineProcessor ( line , flagToContinueProcessingLines ) )
                .toList ( );
List < Future < Boolean > > futures = null;

try (
        ExecutorService executorService = Executors.newFixedThreadPool ( Runtime.getRuntime ( ).availableProcessors ( ) ) ;
) {
    futures = executorService.invokeAll ( lineProcessors );
} catch ( InterruptedException e ) {
    throw new RuntimeException ( e );
}


我们必须修改我们的任务类LineProcessor,以接受AtomicBoolean参数并在内部存储引用。

public class LineProcessor implements Callable < Boolean > {
    private final String line;
    private final AtomicBoolean flagToContinueExecution;

    public LineProcessor ( final String line , final AtomicBoolean flagToContinueExecution ) {
        this.line = line;
        this.flagToContinueExecution = flagToContinueExecution;
    }

    @Override
    public Boolean call ( ) {
        if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
        System.out.println ( "In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " processing line: " + this.line );
        if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
        boolean badSituationArose = ThreadLocalRandom.current ( ).nextBoolean ( );
        if ( badSituationArose ) {
            System.out.println ( "OOPS! In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " funky situation arose while processing line: " + this.line );
            this.flagToContinueExecution.set ( false );  // Flip the flag to signal the other sibling tasks to halt.
            return Boolean.FALSE;
        }
        Boolean success = true;  // If we successfully processed this line.
        return success;
    }
}


请注意我们是如何在if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;的多次出现中进行丢弃的。如果您想让该任务在被另一个任务标记时自动停止其工作,则必须在整个任务代码中散布这样的调用。
运行时:

In thread ID: 21 at 2023-11-01T06:28:12.682969Z processing line: x
In thread ID: 23 at 2023-11-01T06:28:12.682936Z processing line: z
In thread ID: 22 at 2023-11-01T06:28:12.682935Z processing line: y
OOPS! In thread ID: 22 at 2023-11-01T06:28:12.695232Z funky situation arose while processing line: y
done
Call future.get here to get result
done
Call future.get here to get result
done
Call future.get here to get result


虽然与您的问题无关,但我建议在Java 21+中使用虚拟线程以提高任务的吞吐量。只有当您的任务涉及阻塞并且不完全受CPU限制时才这样做。阻塞伴随着数据库调用,文件存储调用,网络调用等。
为了与当前的虚拟线程实现更兼容,对于在任务中执行的任何长时间运行的代码,请将synchronized的使用替换为Reentrant锁对象。
顺便说一句,当做类似这样的开关:

switch (validID) {
    case 200: … 
    case 600: … 
}


.始终包含default大小写以捕获意外值。

相关问题