是否存在具有多个队列的开箱即用线程池(可确保每个队列的串行处理)?

在我的所有任务中,我有一些必须按顺序处理的任务(它们永远不能并发运行,必须按顺序处理)。

我实现了为必须按顺序执行的每组任务创建一个单独的线程池,其中包含一个线程。它有效,但我没有这方面的资源。我不控制组的数量,所以我最终可能会同时运行大量线程。

有没有办法用单线程池来实现这一点?是否有具有多个阻塞队列的线程池,我可以确保每个队列的串行执行?

编辑:

只是强调我在第二段中所说的:我已经通过一个单线程线程池解决了这个问题,用于必须串行执行的每组任务。但是,我不能继续使用这个解决方案。有太多的组,我不能拥有所有这些线程。

我发现了这个相关的问题,但由于它不是最近的,我仍然创建了我的。我所做的只是试图避免重新发明轮子,但似乎我别无选择。

Java 是否有可索引的多队列线程池?


答案 1

正如@SotiriosDelimanolis和@AlexeiKaigorodov所建议的那样,Akka似乎很有希望,以及@Dodd10x第二个答案,这当然可以解决问题。唯一的缺点是,我必须编写自己的轮询策略,以确保我的任务最终被添加到执行器中(就像他的例子中的无限循环一样)。

另一方面,@OldCurmudgeon建议的条纹执行器服务与我的问题完全匹配,并且开箱即用,只是作为自定义 。ExecutorService

这个神奇的线程池将确保所有具有相同 stripeClass 的 Runnables 将按照提交的顺序执行,但具有不同 stripedClasses 的 StripedRunners 仍然可以独立执行。他希望使用一个相对较小的线程池来为大量的Java NIO客户端提供服务,但这样一种方式,运行对象仍然可以按顺序执行。

甚至还有关于对每个组(stripe)使用单线程线程池的评论,正如这里所建议的那样:

提出了一些建议,例如为每个 stripeClass 设置一个 SingleThreadExecutor。但是,这不能满足我们可以在连接之间共享线程的要求。

我认为这是其简单性和易用性的最佳解决方案。


答案 2

如果为每个组维护一个队列,则可以从每个队列中提取项目并将其馈送到线程池中。下面的代码不会优先考虑任何一个组,它只是以循环方式提取它们。如果您需要添加优先级,您应该可以轻松做到。以下代码将使用两个线程(加上管理队列的线程)对 4 个组进行轮循机制。您可以使用其他队列机制。我通常使用LinkedBlockingQueue来表示我想等待另一个线程将项目放在队列中的情况,这可能不是你想要的 - 所以我正在轮询而不是调用take()。“接听”是等待的呼叫。

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

如果该组的任务未完成,它将跳到下一个组。一次处理的组不会超过两个,并且没有一个组将运行多个任务。队列将强制执行有序。


推荐