为什么 Files.list() 并行流的性能比使用 Collection.parallelStream() 慢得多?

以下代码片段是获取目录列表的方法的一部分,在每个文件上调用一个提取方法,并将生成的药物对象序列化为 xml。

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .parallel()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

下面是完全相同的代码,执行完全相同的操作,但使用普通调用来获取目录列表并调用结果列表。.list().parallelStream()

Arrays.asList(infoDir.toFile().list())
    .parallelStream()
    .map(f -> infoDir.resolve(f))
    .map(this::extract)
    .forEachOrdered(drug -> {
        try {
            marshaller.write(drug);
        } catch (JAXBException ex) {
            ex.printStackTrace();
    }
});

我的机器是四核MacBook Pro,Java v 1.8.0_60(内部版本1.8.0_60-b27)。

我正在处理~7000个文件。3 次运行的平均次数:

第一个版本:20秒。不含 : 41 秒.parallel().parallel()

第二版:12秒。与 : 41 秒..parallelStream().stream()

并行模式下的这8秒似乎是一个巨大的差异,因为从流中读取并完成所有繁重工作的方法以及执行最终写入的调用保持不变。extractwrite


答案 1

问题在于,Stream API 的当前实现以及当前未知大小源的实现严重地将这些源拆分为并行任务。您很幸运拥有超过1024个文件,否则您将根本没有并行化的好处。当前流 API 实现考虑了从 返回的值。未知大小的在拆分之前返回,其后缀也始终返回。其拆分策略如下:IteratorSpliteratorestimateSize()SpliteratorIteratorSpliteratorLong.MAX_VALUELong.MAX_VALUE

  1. 定义当前批大小。当前公式是从 1024 个元素开始,并按算术方式增加(2048、3072、4096、5120 等),直到达到大小(即33554432元素)。MAX_BATCH
  2. 将输入元素(在本例中为 Path)使用到数组中,直到达到批大小或输入用尽。
  3. 返回对所创建数组的迭代作为前缀,将自身保留为后缀。ArraySpliterator

假设您有 7000 个文件。流 API 要求估计大小,返回 。好的,Stream API要求拆分,它从底层收集1024个元素到数组,并拆分为(估计大小为1024)和自身(估计大小仍然有效)。与 1024 一样,Stream API 决定继续拆分较大的部分,甚至不尝试拆分较小的部分。所以整个拆分树是这样的:IteratorSpliteratorLong.MAX_VALUEIteratorSpliteratorDirectoryStreamArraySpliteratorLong.MAX_VALUELong.MAX_VALUE

                     IteratorSpliterator (est. MAX_VALUE elements)
                           |                    |
ArraySpliterator (est. 1024 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 2048 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 3072 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 856 elements)    IteratorSpliterator (est. MAX_VALUE elements)
                                                    |
                                        (split returns null: refuses to split anymore)

因此,在那之后,您有五个并行任务要执行:实际上包含1024,2048,3072,856和0元素。请注意,即使最后一个块有 0 个元素,它仍然报告它估计有元素,因此 Stream API 也会将其发送到 。坏处是,Stream API认为进一步拆分前四个任务是无用的,因为它们的估计大小要小得多。因此,您得到的是输入的非常不均匀的分割,它最大限度地利用了四个CPU内核(即使您有更多的CPU内核)。如果对于任何元素,每个元素的处理时间大致相同,则整个过程将等待最大的部分(3072个元素)完成。因此,您可能拥有的最大加速是7000/ 3072 = 2.28x。因此,如果顺序处理需要 41 秒,则并行流大约需要 41/2.28 = 18 秒(这接近您的实际数字)。Long.MAX_VALUEForkJoinPool

您的解决方法解决方案完全没问题。请注意,使用您还将所有输入元素存储在内存中(在对象中)。因此,如果您手动将它们转储到 .数组支持的列表实现(当前由 )可以均匀拆分而不会出现任何问题,从而导致额外的加速。Files.list().parallel()PathArraySpliteratorListArrayListCollectors.toList()

为什么这种情况没有得到优化?当然,这不是不可能的问题(尽管实现可能非常棘手)。对于JDK开发人员来说,这似乎不是高优先级的问题。在邮件列表中就此主题进行了多次讨论。您可以在这里阅读Paul Sandoz的消息,他评论了我的优化工作。


答案 2

作为替代方案,您可以使用此专为以下对象量身定制的自定义拆分器:DirectoryStream

public class DirectorySpliterator implements Spliterator<Path> {
    Iterator<Path> iterator;
    long est;

    private DirectorySpliterator(Iterator<Path> iterator, long est) {
        this.iterator = iterator;
        this.est = est;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Path> action) {
        if (iterator == null) {
            return false;
        }
        Path path;
        try {
            synchronized (iterator) {
                if (!iterator.hasNext()) {
                    iterator = null;
                    return false;
                }
                path = iterator.next();
            }
        } catch (DirectoryIteratorException e) {
            throw new UncheckedIOException(e.getCause());
        }
        action.accept(path);
        return true;
    }

    @Override
    public Spliterator<Path> trySplit() {
        if (iterator == null || est == 1)
            return null;
        long e = this.est >>> 1;
        this.est -= e;
        return new DirectorySpliterator(iterator, e);
    }

    @Override
    public long estimateSize() {
        return est;
    }

    @Override
    public int characteristics() {
        return DISTINCT | NONNULL;
    }

    public static Stream<Path> list(Path parent) throws IOException {
        DirectoryStream<Path> ds = Files.newDirectoryStream(parent);
        int splitSize = Runtime.getRuntime().availableProcessors() * 8;
        DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize);
        return StreamSupport.stream(spltr, false).onClose(() -> {
            try {
                ds.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
    }
}

只需替换为,它将均匀并行化,无需任何中间缓冲。在这里,我们使用一个事实,即生成一个没有任何特定顺序的目录列表,因此每个并行线程将只从中获取一个后续条目(以同步方式,因为我们已经有同步IO操作,因此额外的同步几乎没有开销)。并行顺序每次都会不同(即使使用),但不能保证顺序。Files.listDirectorySpliterator.listDirectoryStreamforEachOrderedFiles.list()

这里唯一重要的部分是要创建多少并行任务。由于在遍历文件夹之前,我们不知道文件夹中有多少文件,因此最好将其用作基础。我创建了关于单个任务的内容,这似乎是一个很好的细粒度/粗粒度折衷方案:如果每个元素的处理是不均匀的,那么拥有比处理器更多的任务将有助于平衡负载。availableProcessors()8 x availableProcessors()


推荐