流 API 和队列:订阅阻塞队列流样式

2022-09-01 11:28:55

假设我们有一个队列

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

和其他一些线程在其中放置值,然后我们像这样阅读它

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

我如何以流样式迭代此队列,同时保持与上述代码相似的语义。

此代码仅遍历当前队列状态:

queue.stream().forEach(e -> System.out.println(e));

答案 1

我有点猜测你所期待的,但我认为我有一个很好的预感。

队列的流(如循环访问队列)表示队列的当前内容。当迭代器或流到达队列的尾部时,它不会阻止等待要添加的更多元素。迭代器或流在该点耗尽,计算终止。

如果需要一个由队列的所有当前和未来元素组成的流,可以执行如下操作:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(不幸的是,处理的需求使这变得非常混乱。InterruptedException

请注意,没有办法关闭队列,也没有办法停止生成元素,所以这实际上是一个无限流。终止它的唯一方法是使用短路流操作,例如 。Stream.generatefindFirst


答案 2

另一种方法是构建自定义拆分器。在我的情况下,我有一个阻塞队列,我想构建一个继续提取元素的流,直到块超时。分路器类似于:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

为处理中断异常而引发的异常是 RuntimeException 的扩展。使用这个类,可以通过以下方式构建流:StreamSupport.stream(new QueueSpliterator(...))并添加通常的流操作。


推荐