我正在使用Java 21和Sping Boot 3.2.0(SNAPSHOT)。我有更多的类,但我只显示这种情况下的基本类。我试图使用ExecutorService在多线程环境中检查一些记录,并根据检查结果继续或停止ExecutorService。
SecurityCheckc类:
private ExecutorService pool;
private final SendToDatabse sendToDatabse;
private final UserCheck userCheck;
private final SynchronisedCheckedMap synchronisedCheckedMap;
public SecurityCheck(SendToDatabse sendToDatabse,
UserCheck userCheck,
SynchronisedCheckedMap synchronisedCheckedMap) {
this.sendToDatabse = sendToDatabse;
this.userCheck = userCheck;
this.synchronisedCheckedMap = synchronisedCheckedMap;
}
public void securityCheck(List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
List<Future<?>> futures = new ArrayList<>();
this.pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
list.forEach(line -> futures.add(pool.submit(
new Security(line,
this.userCheck,
this.synchronisedCheckedMap
))));
for (Future<?> future : futures) {
future.get(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}catch (Exception e){
throw new RuntimeException(e);
}
finally {
print();
pool.shutdown();
while (!pool.awaitTermination(24L,TimeUnit.SECONDS)){
System.out.println("Not yet. Still in waiting for termination");
}
// this.sendToDatabse.sendToDatabse(synchronisedCheckedMap.synchronisedCheckedMap);
}
}
public void stopPool(Short code, Customer customer){
System.out.println("oprire");
Map<Short, List<Customer>> map = new HashMap<>();
map.put(code, (List<Customer>) customer);
map.forEach((k,v)->{
System.out.println(k + "\t" + v);
});
this.pool.shutdownNow();
// this.sendToDatabse.sendToDatabse(map);
Thread.currentThread().interrupt();
}
字符串
安全类:
public class Security implements SQLInjections, Runnable{
private final String line;
private final UserCheck userCheck;
private final SynchronisedCheckedMap synchronisedCheckedMap;
public Security(String line,
UserCheck userCheck,
SynchronisedCheckedMap synchronisedCheckedMap){
this.line = line;
this.userCheck = userCheck;
this.synchronisedCheckedMap = synchronisedCheckedMap;
}
@Override
public void run() {
try {
if (Thread.interrupted()) {
return;
}
JsonToLine jsonToLine = new JsonToLine();
String newLine = jsonToLine.jsonToLine(line);
StringToValues stringToValues = new StringToValues();
List<String> values = stringToValues.stringToValues(newLine);
// values.forEach(string -> {System.out.println("line: " + string);});
Short validID = validID(values.get(0));
Short validUser = null;
Short validPassword = null;
Short validMessage = null;
List<String> message = new ArrayList<>();
Customer customer;
switch (validID) {
case 200:
// values.forEach(string -> {System.out.println("code" + "\t" + "line: " + string);});
validUser = validUser(values.get(1));
validPassword = validPassword(values.get(2));
message = Arrays.asList(values.get(3).split(";"));
validMessage = validMesage(message);
customer =
new Customer(Long.parseLong(values.get(0)),
values.get(1),
values.get(2),
message);
if(validUser==200 &&
validMessage==200 &&
validPassword==200){
this.synchronisedCheckedMap.synchronisedCheckedMap((short) 200, customer);
}else{
this.synchronisedCheckedMap.synchronisedCheckedMap(
getHighestCode(List.of(validUser,validID,validMessage,validPassword)), customer);
}
break;
case 600:
validUser = validUser(values.get(0));
validPassword = validPassword(values.get(1));
message = Arrays.asList(values.get(3).split(";"));
validMessage = validMesage(message);
customer =
new Customer(null,
values.get(1),
values.get(2),
message);
this.synchronisedCheckedMap.synchronisedCheckedMap((short) 600, customer);
break;
}
}catch (Exception e){
Thread.currentThread().interrupt(); // Preserve interruption status
return;
}
}
}
型
我还有SyncroniseMap
@Component
public class SynchronisedCheckedMap {
public Map<Short, List<Customer>> synchronisedCheckedMap = new HashMap<>();
private SecurityCheck securityCheck;
public SynchronisedCheckedMap(SecurityCheck securityCheck) {
this.securityCheck = securityCheck;
}
public SynchronisedCheckedMap() {}
protected synchronized void synchronisedCheckedMap(Short code,
Customer customer){
if(code>=800){
this.securityCheck.stopPool(code,customer);
}
if(this.synchronisedCheckedMap.containsKey(code)){
this.synchronisedCheckedMap.get(code).add(customer);
}else {
List<Customer> list = new ArrayList<>();
list.add(customer);
synchronisedCheckedMap.put(code,list);
}
}
public void print(){
this.synchronisedCheckedMap.forEach((k,v)->{
System.out.println(k + "\t" + v);
});
}
}
型
我把这个类中的依赖注入搞砸了,因为它根本不工作。线程仍然在运行。但是我试图在“stopPool”方法上放置调试器,并且没有停止。
我知道我将不得不在这些方面做更多的工作,但我想从SecurityCheck类中停止ExecutorService。当它是一个发送到“this. synchronisedBikedMap. synchronisedBikedMap”的特定代码时,我试图调用“stopPool”方法。
我已经尝试过这样做,但我需要帮助才能使它工作,因为线程继续下去。
在这种情况下,我应该如何正确地停止ExecutorService?
1条答案
按热度按时间cmssoen21#
私有ExecutorService池;
关于这个名字,不要把你的executor服务看作是一个线程池。
一个executor service * 可以 * 由一个线程池支持。或者它可以由单个线程支持。或者它可以没有线程支持,可以在当前线程上执行它继承的
execute
方法。在Java 21+中,执行器服务可以为每个提交的任务创建一个新的virtual thread,没有池。因此,可以将执行器服务看作是代表您以某种方式执行代码的服务。
我有更多的类,但我只显示了这种情况下的基本类。
你展示了太多的代码,没有缩小到你的特定问题的焦点。所以我可以给予一些提示和建议来指导你。但是如果你想要一个直接的答案,你需要提供一个直接的问题。
我正试图在多线程环境中检查一些记录
当你有一堆任务要执行时,你可以通过
invokeAll
将它们一次性提交给你的执行器服务。这只是一个方便,与一次提交一个任务的效果相同。看起来你只是有一个单独处理的文本行列表。你似乎试图通过停止executor服务来停止任务,这可能很笨拙。线程不能直接停止。在Java中,线程是通过设计你的代码来测试中断状态或其他条件来合作停止的。Search了解更多信息。
让我们定义一个简单的示例任务类。
如果你想从每个任务返回一个结果,使用
Callable
而不是Runnable
。在这种情况下,我们返回一个Boolean
对象来指示我们的行是否成功处理。字符串
ExecutorService
现在是AutoCloseable
。因此,您可以使用try-with-resources语法在执行程序服务完成一批任务时自动关闭它。在这里,我们模拟获取一个行列表。我们将每个行发送到
LineProcessor
任务Callable
类的构造函数。我们将这些构造的任务对象传递给executor服务。我们得到一个Future
对象的列表。通过使用try-with-resources语法,我们的代码阻塞,直到executor服务执行完所有任务或在一天后过期。型
如果我们想检查结果,我们可以添加更多的代码来处理
Future
对象。型
运行时:
型
回到你的问题的要点,它似乎是问如果其中一个任务遇到特定的情况,如何中断兄弟任务。
在Java中,线程是协同中断的。你可以要求executor服务提前关闭,它会尝试中断它的每个任务的线程。但是中断处理有点棘手。我建议你设置一个特定的标志,让所有的任务共享。这个标志必须是线程-安全。所以我将使用
AtomicBoolean
作为任务之间的标志,表明一切正常,它们应该继续工作。让我们初始化该标志,并在构造任务对象时将其作为参数传递。型
我们必须修改我们的任务类
LineProcessor
,以接受AtomicBoolean
参数并在内部存储引用。型
请注意我们是如何在
if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
的多次出现中进行丢弃的。如果您想让该任务在被另一个任务标记时自动停止其工作,则必须在整个任务代码中散布这样的调用。运行时:
型
虽然与您的问题无关,但我建议在Java 21+中使用虚拟线程以提高任务的吞吐量。只有当您的任务涉及阻塞并且不完全受CPU限制时才这样做。阻塞伴随着数据库调用,文件存储调用,网络调用等。
为了与当前的虚拟线程实现更兼容,对于在任务中执行的任何长时间运行的代码,请将
synchronized
的使用替换为Reentrant
锁对象。顺便说一句,当做类似这样的开关:
型
.始终包含
default
大小写以捕获意外值。