如何阻止直到阻塞队列为空?

2022-09-01 17:27:37

我正在寻找一种方法来阻止,直到 a 为空。BlockingQueue

我知道,在多线程环境中,只要有生产者将项目放入,就会出现队列变为空的情况,并且在几纳秒后它就充满了项目。BlockingQueue

但是,如果只有一个创建者,则它可能希望等待(并阻止)直到队列停止将项目放入队列后队列为空。

Java/Pseudocode:

// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");

你有什么想法吗?

编辑:我知道包装和使用额外的条件可以解决问题,我只是在问是否有一些预制的解决方案和/或更好的替代方案。BlockingQueue


答案 1

一个简单的解决方案,使用 和 :wait()notify()

// Producer:
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); //wait for the queue to become empty
    queue.put();
}

//Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}

答案 2

我知道你已经有很多线程主动轮询或接受队列,但我仍然对你的流程/设计感到不太对。

队列变为空并不意味着以前添加的任务已完成,某些项目可能需要很长时间才能处理,因此检查空任务并不太有用。

因此,您应该做的是忘记 ,您可以将其用作任何其他集合。将项目转换为 a 并利用 .BlockingQueueCollectionsCallableExecutorService.invokeAll()

    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done

此方法将使您可以完全控制生产者可以等待多长时间以及适当的异常处理。


推荐