java中跨不同任务但不是相同路径的多线程

n7taea2i  于 2021-08-25  发布在  Java
关注(0)|答案(2)|浏览(271)

关闭。这个问题需要更加关注。它目前不接受答案。
**想改进这个问题吗?**编辑这篇文章,更新这个问题,使它只关注一个问题。

三天前关门。
改进这个问题
我有一个队列,生产者可以在其中提交任务。每个任务都有一个有效负载和一个文件路径。在使用者端,我有一个执行器线程池,它应该从队列中提取任务并将这些任务分配给线程。以下是需要遵循的一些约束条件:
不能有多个线程在同一文件路径上工作
我们希望为特定文件路径添加的任务按顺序完成
下面是问题陈述的一个例子。让我们假设以下是队列的状态。这里t1是线程名,括号中包含文件路径。 head -> [T1(A),T2(B),T3(C),T4(A),T5(B),T6(A),T7(A),T8(C)] <- tail 因此,我们可以有三个线程处理文件路径的任务 A , BC 同时。但是,以下情况不应发生。这里是文件路径 A 有两个线程同时处理它。 Pool1-Thread1-T4(A) Pool1-Thread2-T5(B) Pool1-Thread3-T6(A) 文件路径的数量可以介于 10k-50k 为了解决这个问题,我提出的方法是有两个Map,即 Map<String, Queue<Tasks>> 对于任务队列和 Map<String, AtomicBoolean> 对于令牌,它们都具有针对文件路径的密钥。一个正在运行的消费者线程,它将在 Tasks Map 在为该文件路径创建线程之前,需要获取该文件路径的标记(布尔值应为true)。一旦线程完成,它将返回标记(通过将布尔值标记回 true ).
问题
这是这个用例的最佳方法,还是我们可以做得更好和/或更简单?
在这种方法中,是否有任何多线程问题可能会影响我以后的工作?
编辑
阐明如何执行任务。我将有一个运行的消费者线程,该线程将在 Tasks Map . 如果它发现任何可用的任务和令牌,它将从执行器中生成一个新线程。
任务本身很小,但传入任务的吞吐量将很高。这里的文件路径代表克隆的git repo,任务可以是添加文件、修改文件、提交和推送到远程。

x3naxklr

x3naxklr1#

我将避免使用executor线程池,并使用以下方法:
创建一个具有blockingqueue(选择实现)和一个持续使用该队列的线程的类。它显然会消耗任务。当它获取任务时,它将执行它。这可以保证队列中的任务是按顺序执行的,并且最多可以执行该队列中的一个任务。
现在创建一些将任务Map到队列的逻辑。e、 g.如果不希望同时执行相同类型的任务,请将它们全部Map到同一队列。例如,a总是转到队列1,b到2,等等。。。它遵循前面提到的属性,通过这种方式,不会有两个类型为a的任务并行进行。
一种常见的方法是将所有队列放在一个数组中,然后根据队列数量的某个值进行分配,这就是调用分区,例如:

queues = new Queue[N];

typeOfTask = task.getType() (to int somehow, for example you can hash or use the char if it is a single letter)

queueIndex = typeTask % queues.length

queues[queueIndex].push(task)

在上面的类中,在将任务传递给线程之前获取任务时,您可以执行任何逻辑,就像您提到的那样。
最后,在使用文件系统时,我将避免使用许多线程(如果有的话),因为磁盘是。。。一个。因此,并行处理没有什么大的意义,除非您必须编写大量不需要缓冲的数据。相反,线程更多的是能够并行工作,使用更多的处理器来提高速度,或者在执行阻塞操作时。

htzpubme

htzpubme2#

我试着实现消费者用例的一部分,似乎问题多于解决方案。请阅读代码中的内联注解。

public class Consumer implements Runnable {

    private volatile boolean isStopped;
    private final Map<String, Queue<Task>> tasks;
    private final Map<String, AtomicBoolean> tokens;
    //Preferably FixedThreadService
    private final ExecutorService service;

    public Consumer(Map<String, Queue<Task>> tasks, Map<String, AtomicBoolean> tokens, ExecutorService service) {
        this.tasks = tasks;
        this.tokens = tokens;
        this.service = service;
        isStopped = false;
    }

    public void stopConsumer(){
        isStopped = true;
    }

    @Override
    public void run() {
        while(!isStopped){
            // tasks map should have latest values because producer is going to update this map
            // frequently. Should have to use ConcurrentHashMap.
            Set<String> keys = tasks.keySet();
            for ( String key:tasks.keySet() ) {
                if(!tokens.get(key).get()){
                    continue;
                }
              try {
                  //This does not mean the task is in progress, just submitted for execution.
                  service.execute(tasks.get(key).poll());
                  //Now that we submitted the task for exicution, we can mark the token to false.
                  tokens.get(key).compareAndSet(true, false);
                  // But at what point are we going to mark token to True. If this token is updated
                  // From any other thread than consumer, it will create race condition.
              }catch(RejectedExecutionException e){
                  //What will happen to the polled task. Task is removed from the queue.

                  //Token we can mark to true, but task has already polled from queue.
                  // Shall we put it back to the head of the queue and how ?
                  tokens.get(key).compareAndSet(false, true);
                  //What if the task is not completed smoothly. T1 is not completed successfully,
                  // what is backup plan, because next task will start any way and corrupt the file.

              }
            }

        }
    }
}

如果我们能将诸如1.添加文件、2.修改文件、3.提交和4.推送到远程等任务隔离开来,我们可能会想出更好的方法。但在这种情况下也会有限制。
最好使用单线程方法。

相关问题