如果为每个组维护一个队列,则可以从每个队列中提取项目并将其馈送到线程池中。下面的代码不会优先考虑任何一个组,它只是以循环方式提取它们。如果您需要添加优先级,您应该可以轻松做到。以下代码将使用两个线程(加上管理队列的线程)对 4 个组进行轮循机制。您可以使用其他队列机制。我通常使用LinkedBlockingQueue来表示我想等待另一个线程将项目放在队列中的情况,这可能不是你想要的 - 所以我正在轮询而不是调用take()。“接听”是等待的呼叫。
private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
= new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
= new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
= new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
= new LinkedBlockingQueue<>();
private ExecutorService executor = Executors.newFixedThreadPool(2);
public void startProcessing() {
while (true) {
if (group1Future != null && group1Future.isDone()) {
if (group1Queue.peek() != null) {
group1Future = executor.submit(group1Queue.poll());
}
}
if (group2Future != null && group1Future.isDone()) {
if (group2Queue.peek() != null) {
group2Future = executor.submit(group2Queue.poll());
}
}
if (group3Future != null && group3Future.isDone()) {
if (group3Queue.peek() != null) {
group3Future = executor.submit(group3Queue.poll());
}
}
if (group4Future != null && group4Future.isDone()) {
if (group4Queue.peek() != null) {
group4Future = executor.submit(group4Queue.poll());
}
}
}
}
如果该组的任务未完成,它将跳到下一个组。一次处理的组不会超过两个,并且没有一个组将运行多个任务。队列将强制执行有序。