所以我的场景是我收到一个执行作业的消息。作业有一个sourceId。现在,在同一时间,一个具有一种类型的sourceId的作业应该正在运行,其他作业应该排队。当作业启动时,它应该再次将自己的工作分解为多个小执行。对于每个小执行,我需要更新数据库,了解作业的多少部分已经完成。一旦整个作业结束,我需要更新作业完成的数据库。如果有错误,我需要在数据库中标记作业为失败。这是我想出的一个大致草图。你们能帮助我吗?如果我错过了一些东西,我应该如何做数据库更新。还有,有没有任何Java语言功能,可以让我的生活轻松。还有一件事,我需要在quarkus应用程序中做到这一点。
class PrimaryWorker{
ConcurrentHashMap<String,ArrayBlockingQueue> staging;
public void submit(){
staging.checkIfEntryExistForSource()
if(yes){
getTheEntry()
createSecondaryWorkerWithQueue();
pushTheJobToQueue();
}else{
createEntryForDataSource()
createSecondaryWorkerWithQueue();
pushTheJobToQueue();
}
}
}
class BusinessJobWorker{
ArrayBlockingQueue input;
ArrayBlockingQueue commonOutput;
public SecondaryWorker(input,commonOutput){
}
public void run(){
BusinessJob br = input.poll();
ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
if(canBeParallel()){
queue = new ArrayBlockingQueue(1);
}
do{
Batch batch = getEntities(br,batchSize,pageNumber);
workerQueue.push(batch);
}while(batch.hasNext);
updateDatabase();
}
public void updateDatabase(){
}
public boolean canBeParallel(){
}
}
class BatchWorker{
ArrayBlockingQueue commonInput;
ArrayBlockingQueue output;
public BatchWorker(input){
}
}
字符串
我想介绍的一种情况是,如果服务器崩溃或由于某种原因被终止,则应在重新启动时恢复作业执行,并应保持sourceId级别的并发控制。
我仍然没有找出正确的方法来做我想Maven的意见,就如何多线程任务应该做。
1条答案
按热度按时间xvw2m8pv1#
你看过Mutiny吗?这是Quarkus中用于响应式编程的库,请参阅此链接:
https://smallrye.io/smallrye-mutiny/latest/tutorials/getting-mutiny/
我还没有测试过下面的代码,但它应该是这样的:
假设你有一个带有MyJobEntity的db(在这个例子中我使用Mongo):
字符串
然后,您可以按以下方式处理作业:
型
此代码将并行处理每个作业,并按顺序处理执行部分。如果执行完成,作业的状态将更改为FINISHED。否则,它将抛出一个错误,将状态设置为FANUC。