在并行流上按顺序调用使所有先前的操作按顺序

我有一组重要的数据,并且想要调用慢速但干净的方法,而不是调用快速方法,并对第一个方法的结果产生副作用。我对中间结果不感兴趣,所以我不想收集它们。

显而易见的解决方案是创建并行流,进行慢速调用,使流再次连续,并进行快速调用。问题是,所有代码都在单线程中执行,没有实际的并行性。

示例代码:

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}

如果我删除 ,代码按预期执行,但显然,非并行操作将在多个线程中调用。sequential

您能否推荐一些关于此类行为的参考资料,或者某种避免临时收集的方法?


答案 1

在最初的 Stream API 设计中,将流从 切换到 是有效的,但导致了许多问题,最终实现发生了变化,因此它只是为整个管道打开和关闭并行标志。当前的文档确实很模糊,但在Java-9中得到了改进:parallel()sequential()

流管道按顺序或并行执行,具体取决于调用终端操作的流的模式。流的顺序模式或并行模式可以用该方法确定,流模式可以用 和 操作修改。最新的顺序或并行模式设置适用于整个流管道的执行。BaseStream.isParallel()BaseStream.sequential()BaseStream.parallel()

至于你的问题,你可以把所有东西都收集到中间,然后开始新的顺序管道:List

new Random().ints(100).boxed()
        .parallel()
        .map(this::slowOperation)
        .collect(Collectors.toList())
        // Start new stream here
        .stream()
        .map(Function.identity())//some fast operation, but must be in single thread
        .collect(Collectors.toSet());

答案 2

在当前实现中,流要么是全并行的,要么是全顺序的。虽然Javadoc对此没有明确规定,并且将来可能会发生变化,但它确实说这是可能的。

S 平行()

返回并行的等效流。可能会返回自身,因为流已经并行,或者因为基础流状态被修改为并行。

如果你需要单线程的函数,我建议你使用Lock或同步块/方法。