串联并行流

假设我有两个数组和.我只想从第一个数组中获取正数,从第二个数字中获取不同的数字,将它们合并在一起,排序并存储到结果数组中。这可以使用流执行:int[]input1input2

int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                   Arrays.stream(input2).distinct()).sorted().toArray();

我想加快任务速度,所以我考虑使流并行。通常,这只是意味着我可以在流构造和终端操作之间的任何位置插入,结果将是相同的。IntStream.concat 的 JavaDoc 表示,如果任何输入流是并行的,则生成的流将是并行的。因此,我认为制作流或流或串联流将产生相同的结果。.parallel()parallel()input1input2

实际上我错了:如果我添加到生成的流中,输入流似乎仍然是连续的。此外,我可以将输入流(其中一个或两个)标记为 ,然后将生成的流转换为 ,但输入保持并行。因此,实际上有8种可能性:input1,input2和串联流中的任何一种都可以并行,也可以不并行:.parallel().parallel().sequential()

int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();

我针对不同的输入大小对所有版本进行了基准测试(在Core i5 4xCPU,Win7上使用JDK 8u45 64bit),并在每种情况下都获得了不同的结果:

Benchmark           (n)  Mode  Cnt       Score       Error  Units
ConcatTest.SSS      100  avgt   20       7.094 ±     0.069  us/op
ConcatTest.SSS    10000  avgt   20    1542.820 ±    22.194  us/op
ConcatTest.SSS  1000000  avgt   20  350173.723 ±  7140.406  us/op
ConcatTest.SSP      100  avgt   20       6.176 ±     0.043  us/op
ConcatTest.SSP    10000  avgt   20     907.855 ±     8.448  us/op
ConcatTest.SSP  1000000  avgt   20  264193.679 ±  6744.169  us/op
ConcatTest.SPS      100  avgt   20      16.548 ±     0.175  us/op
ConcatTest.SPS    10000  avgt   20    1831.569 ±    13.582  us/op
ConcatTest.SPS  1000000  avgt   20  500736.204 ± 37932.197  us/op
ConcatTest.SPP      100  avgt   20      23.871 ±     0.285  us/op
ConcatTest.SPP    10000  avgt   20    1141.273 ±     9.310  us/op
ConcatTest.SPP  1000000  avgt   20  400582.847 ± 27330.492  us/op
ConcatTest.PSS      100  avgt   20       7.162 ±     0.241  us/op
ConcatTest.PSS    10000  avgt   20    1593.332 ±     7.961  us/op
ConcatTest.PSS  1000000  avgt   20  383920.286 ±  6650.890  us/op
ConcatTest.PSP      100  avgt   20       9.877 ±     0.382  us/op
ConcatTest.PSP    10000  avgt   20     883.639 ±    13.596  us/op
ConcatTest.PSP  1000000  avgt   20  257921.422 ±  7649.434  us/op
ConcatTest.PPS      100  avgt   20      16.412 ±     0.129  us/op
ConcatTest.PPS    10000  avgt   20    1816.782 ±    10.875  us/op
ConcatTest.PPS  1000000  avgt   20  476311.713 ± 19154.558  us/op
ConcatTest.PPP      100  avgt   20      23.078 ±     0.622  us/op
ConcatTest.PPP    10000  avgt   20    1128.889 ±     7.964  us/op
ConcatTest.PPP  1000000  avgt   20  393699.222 ± 56397.445  us/op

从这些结果中,我只能得出结论,步骤的并行化会降低整体性能(至少在我的测试中)。distinct()

所以我有以下问题:

  1. 是否有任何关于如何更好地将并行化与串联流结合使用的官方指南?测试所有可能的组合并不总是可行的(特别是在连接两个以上的流时),所以有一些“经验法则”会很好。
  2. 似乎如果我连接直接从集合/数组创建的流(在连接之前不执行中间操作),那么结果并不依赖于 .这是真的吗?parallel()
  3. 除了串联之外,是否还有其他情况,其中的结果取决于流管道在哪个点上并行化?

答案 1

该规范精确地描述了您获得的内容 - 当您考虑到与其他操作不同时,我们不是在谈论单个管道,而是三个不同的管道,它们保留了独立于其他管道的属性。Stream

规范说:“生成的流是[...]如果任何一个输入流是并行的,则并行“,这就是你得到的;如果任一输入流是并行的,则生成的流是并行的(但之后可以将其转换为顺序)。但是,将生成的流更改为并行或顺序不会更改输入流的性质,也不会将并行流和顺序流馈送到 中。concat

关于性能后果,请参阅文档“流操作和管道”段落

中间操作进一步分为无状态操作和有状态操作。无状态操作(如 and )在处理新元素时不会保留以前看到的元素的状态 - 每个元素都可以独立于其他元素上的操作进行处理。有状态操作(如 and )在处理新元素时可能会合并以前见过的元素中的状态。filtermapdistinctsorted

有状态操作可能需要在生成结果之前处理整个输入。例如,在看到流的所有元素之前,无法通过对流进行排序产生任何结果。因此,在并行计算下,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。包含独占无状态中间操作的管道可以在单次传递中处理,无论是顺序的还是并行的,并且具有最少的数据缓冲。

您已经选择了两个命名的有状态操作并将它们组合在一起。因此,生成的流的操作需要缓冲整个内容,然后才能开始排序,这意味着操作的完成。不同的操作显然很难并行化,因为线程必须同步已经看到的值。.sorted()distinct

因此,要回答您的第一个问题,这不是关于并行执行,而只是没有从并行执行中受益。concatdistinct

这也会使第二个问题过时,因为您在两个串联的流中执行完全不同的操作,因此您无法对预串联的集合/数组执行相同的操作。连接数组并在生成的数组上运行不太可能产生更好的结果。distinct

关于您的第三个问题,有关流的行为可能是惊喜的来源......flatMapparallel


答案 2

推荐