在这种情况下,我应该如何使用线程在java中实现并行性?

eivnm1vs  于 5个月前  发布在  Java
关注(0)|答案(1)|浏览(40)

所以我的场景是我收到一个执行作业的消息。作业有一个sourceId。现在,在同一时间,一个具有一种类型的sourceId的作业应该正在运行,其他作业应该排队。当作业启动时,它应该再次将自己的工作分解为多个小执行。对于每个小执行,我需要更新数据库,了解作业的多少部分已经完成。一旦整个作业结束,我需要更新作业完成的数据库。如果有错误,我需要在数据库中标记作业为失败。这是我想出的一个大致草图。你们能帮助我吗?如果我错过了一些东西,我应该如何做数据库更新。还有,有没有任何Java语言功能,可以让我的生活轻松。还有一件事,我需要在quarkus应用程序中做到这一点。

class PrimaryWorker{
    ConcurrentHashMap<String,ArrayBlockingQueue> staging;

    public void submit(){
        staging.checkIfEntryExistForSource()
        if(yes){
            getTheEntry()
            createSecondaryWorkerWithQueue();
            pushTheJobToQueue();
        }else{
            createEntryForDataSource()
            createSecondaryWorkerWithQueue();
            pushTheJobToQueue();
        }
    }
}
class BusinessJobWorker{
    ArrayBlockingQueue input;
    ArrayBlockingQueue commonOutput;

    public SecondaryWorker(input,commonOutput){

    }

    public void run(){
        BusinessJob br = input.poll();
        ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        if(canBeParallel()){
            queue = new ArrayBlockingQueue(1);
        }
        do{
            Batch batch = getEntities(br,batchSize,pageNumber);
            workerQueue.push(batch);
        }while(batch.hasNext);
        updateDatabase();
    }
    public void updateDatabase(){

    }
    public boolean canBeParallel(){

    }
}
class BatchWorker{
    ArrayBlockingQueue commonInput;
    ArrayBlockingQueue output;

    public BatchWorker(input){

    }
    
}

字符串
我想介绍的一种情况是,如果服务器崩溃或由于某种原因被终止,则应在重新启动时恢复作业执行,并应保持sourceId级别的并发控制。
我仍然没有找出正确的方法来做我想Maven的意见,就如何多线程任务应该做。

xvw2m8pv

xvw2m8pv1#

你看过Mutiny吗?这是Quarkus中用于响应式编程的库,请参阅此链接:
https://smallrye.io/smallrye-mutiny/latest/tutorials/getting-mutiny/
我还没有测试过下面的代码,但它应该是这样的:
假设你有一个带有MyJobEntity的db(在这个例子中我使用Mongo):

public class MyJobEntity extends ReactivePanacheMongoEntity {
     public State state;
     public List<Execution> executions;     
}

字符串
然后,您可以按以下方式处理作业:

public Uni<Void> processAllJobs() {
        return MyJobEntity.<MyJobEntity>streamAll()
                .call(job -> processJob(job))
                .call(job -> setJobState(job, "FINISHED"))
                .invoke(job -> Log.infof("Job(id=%s, state=FINISHED) finished successfully", job.id))
                .onItem().ignoreAsUni();
    }

    public Uni<Void> processJob(MyJobEntity job) {
        return Multi.createFrom().iterable(job.executions)
                .onItem().transformToUniAndConcatenate(execution -> executePartOfJob(execution)
                        .onFailure()
                        .invoke(() -> new JobFailedException("Job(id=%s, executionId=%s) could not be completed".formatted(jobId, execution.id)))
                )
                .onItem().ignoreAsUni()
                .onFailure(JobFailedException.class).call(() -> setJobState(job, "FAILED"));
    }


此代码将并行处理每个作业,并按顺序处理执行部分。如果执行完成,作业的状态将更改为FINISHED。否则,它将抛出一个错误,将状态设置为FANUC。

相关问题