“关闭”阻塞队列

2022-09-01 09:36:01

我在一个非常简单的生产者-消费者场景中使用java.util.concurrent.BlockingQueue。例如,这个伪代码描述了消费者部分:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

目前为止,一切都好。在阻塞队列的javadoc中,我读到:

BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作来指示不会再添加任何项目。这些特性的需求和用法往往依赖于实现。例如,生产者的一种常见策略是插入特殊的流端或毒物,当消费者拿走这些物体时,这些物体会相应地解释。

不幸的是,由于正在使用的泛型和ComplexObject的性质,将“毒物”推入队列并非易事。因此,这种“常见策略”在我的场景中并不方便。

我的问题是:我可以使用哪些其他好的策略/模式来“关闭”队列?

谢谢!


答案 1

如果您有使用者线程的句柄,则可以中断它。使用您提供的代码,这将杀死消费者。我不指望制片人有这个;它可能必须以某种方式回调到程序控制器,以让它知道它已经完成了。然后控制器将中断使用者线程。

您始终可以在服从中断之前完成工作。例如:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

无论如何,我实际上宁愿以某种方式毒害队列。如果要杀死线程,请让线程自行终止。

(很高兴看到有人处理得当。InterruptedException


关于中断的处理,似乎存在一些争议。首先,我希望每个人都能阅读这篇文章

现在,有了没有人真正读过的理解,这就是交易。仅当线程在中断时当前处于阻塞状态时,它才会收到。在这种情况下,将返回 。如果它没有被阻止,它将不会收到此异常,而是返回 。因此,无论如何,您的环路防护都应该绝对检查 ,否则可能会错过线程中断的风险。InterruptedExceptionThread.interrupted()falseThread.interrupted()trueThread.interrupted()

因此,由于您正在检查无论如何都要进行检查,并且您被迫捕获(即使您没有被迫也应该处理它),因此您现在有两个代码区域处理相同的事件,线程中断。处理此问题的一种方法是将它们规范化为一个条件,这意味着布尔状态检查可以引发异常,或者异常可以设置布尔状态。我选择后者。Thread.interrupted()InterruptedException


编辑:请注意,静态 Thread#interrupted 方法清除当前线程的中断状态。


答案 2

使这变得简单的另一个想法:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

然后用作队列。当您希望结束队列的处理时,请执行 。在消费者方面,做BlockingQueue<QueueableComplexObject>queue.offer(NullComplexObject.INSTANCE)

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

不需要,而且您不必构建一个假货,根据其实现,这可能是昂贵/困难的。instanceofComplexObject