Java BlockingQueue with batching?

我对与Java BlockingQueue相同的数据结构感兴趣,除了它必须能够对队列中的对象进行批处理。换句话说,我希望生产者能够将对象放入队列中,但要打开使用者块,直到队列达到一定大小(批大小)。take()

然后,一旦队列达到批大小,生产者必须阻塞,直到消费者消耗了队列中的所有元素(在这种情况下,生产者将再次开始生产,消费者块直到再次到达批)。put()

是否存在类似的数据结构?或者我应该写它(我不介意),我只是不想浪费我的时间,如果那里有东西。


更新

也许可以澄清一些事情:

情况将始终如下所示。可以有多个创建者将项目添加到队列中,但永远不会有多个使用者从队列中获取项目。

现在,问题是这些设置中有多个并行和串行的设置。换句话说,生产者为多个队列生产项目,而消费者本身也可以是生产者。这可以更容易地被认为是生产者,消费者 - 生产者,最后是消费者的有向图。

生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是,这些队列中的每一个都将在线程中运行。如果你让它们在空间变得可用时简单地生产,你最终会遇到这样一种情况,即你有太多的线程试图一次处理太多的事情。

也许将其与执行服务相结合可以解决问题?


答案 1

我建议你使用BlockingQueue.drainTo(Collection,int)。你可以将它与 take() 一起使用,以确保获得最少数量的元素。

使用此方法的优点是,批大小随工作负荷动态增长,并且创建者不必在使用者忙碌时阻止。即,它可以针对延迟和吞吐量进行自我优化。


要完全按照要求实现(我认为这是一个坏主意),您可以使用具有繁忙消耗线程的SyncQueue。

即,消耗线程执行

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

生产者会在消费者忙碌时阻止。


答案 2

这是一个快速(=简单但未完全测试)的实现,我认为可能适合您的请求 - 如果需要,您应该能够扩展它以支持完整的队列接口。

要提高性能,您可以切换到 ReentrantLock,而不是使用“同步”关键字。

public class BatchBlockingQueue<T> {

    private ArrayList<T> queue;
    private Semaphore readerLock;
    private Semaphore writerLock;
    private int batchSize;

    public BatchBlockingQueue(int batchSize) {
        this.queue = new ArrayList<>(batchSize);
        this.readerLock = new Semaphore(0);
        this.writerLock = new Semaphore(batchSize);
        this.batchSize = batchSize;
    }

    public synchronized void put(T e) throws InterruptedException {
        writerLock.acquire();
        queue.add(e);
        if (queue.size() == batchSize) {
            readerLock.release(batchSize);
        }
    }

    public synchronized T poll() throws InterruptedException {
        readerLock.acquire();
        T ret = queue.remove(0);
        if (queue.isEmpty()) {
            writerLock.release(batchSize);
        }
        return ret;
    }

}

希望你觉得它有用。


推荐