阻塞队列和多线程使用者,如何知道何时停止

我有一个单线程生产者,它创建一些任务对象,然后将其添加到一个(大小固定)中。ArrayBlockingQueue

我还启动了一个多线程消费者。这是作为固定线程池 () 构建的。然后,我向此线程池提交一些 ConsumerWorker intances,每个 ConsumerWorker 都有一个对上述 ArrayBlockingQueue 实例的引用。Executors.newFixedThreadPool(threadCount);

每个这样的 Worker 都会在队列上执行一个操作并处理该任务。take()

我的问题是,让工人知道什么时候不会再做任何工作要做的最佳方法是什么。换句话说,我如何告诉 Workers 生产者已经完成了添加到队列中,从此时起,每个 Worker 都应该在看到队列为空时停止。

我现在得到的是一个设置,其中我的生产者使用回调进行初始化,该回调在他完成其工作(将内容添加到队列)时触发。我还保留了我创建并提交到ThreadPool的所有ConsumerWorkers的列表。当制片人回调告诉我制片人已经完成时,我可以告诉每个工人。此时,他们应该继续检查队列是否不为空,当它变为空时,他们应该停止,从而允许我优雅地关闭ExecuterService线程池。就像这样

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是,我有一个明显的竞争条件,有时生产者会完成,发出信号,消费者工人会在使用队列中的所有内容之前停止。

我的问题是,同步它的最佳方法是什么,以便一切正常?我是否应该同步整个部分,它检查生产者是否正在运行,以及队列是否为空,以及从一个块中的队列中获取一些东西(在队列对象上)?我是否应该只在 ConsumerWorker 实例上同步布尔值的更新?还有其他建议吗?isRunning

更新,这是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这在很大程度上受到JohnVint在下面的答案的启发,只有一些小的修改。

===由于@vendhan的评论而更新。

感谢您的光临。你是对的,这个问题中的第一个代码片段(除其他问题外)有一个没有真正意义的代码片段。while(isRunning || !inputQueue.isEmpty())

在我实际的最终实现中,我做了一些更接近你建议替换“||”的事情。(或)用“&&”(和),从某种意义上说,每个工人(消费者)现在只检查他从列表中得到的元素是否是毒丸,如果是这样,就会停止(所以从理论上讲,我们可以说工人必须运行并且队列不能为空)。


答案 1

您应该继续从队列中。你可以用毒丸告诉工人停下来。例如:take()

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

答案 2

我会向工人发送一个特殊的工作包,以表明他们应该关闭:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

要停止工作线程,只需添加到队列中即可。ConsumerWorker.DONE