对并行流进行排序时遇到顺序错误
我有一个类:Record
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
我创建了一个包含大量记录的大列表。只有第二个和第五个值和 稍后分别由 getter 和 使用。i / 10000
i
getCategory2()
getValue1()
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}
请注意,前 10,000 条记录具有 a 的 ,然后接下来的 10,000 条具有 等,而值按顺序为 0-114999。category2
0
1
value1
我创建了一个既是和.Stream
parallel
sorted
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
我有一个维护线程的,这是我在PC上拥有的内核数。ForkJoinPool
8
ForkJoinPool pool = new ForkJoinPool(8);
我使用这里描述的技巧将流处理任务提交到我自己的ForkJoinPool
,而不是普通的ForkJoinPool
。
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
我期望并行操作将遵循流的遭遇顺序,并且它将是一个稳定的排序,因为返回的是 。sorted
Spliterator
ArrayList
ORDERED
但是,按顺序打印出结果元素的简单代码表明情况并非如此。List
output
for (Record record : output)
{
System.out.println(record.getValue1());
}
输出,压缩:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
的 is 和所有元素似乎都在那里,只是顺序略有不同。size()
output
115000
所以我写了一些检查代码,看看它是否稳定。如果它是稳定的,则所有值都应保持原样。此代码验证订单,打印任何差异。sort
value1
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
输出:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
如果我执行以下任一操作,则此情况仍然存在:
-
将 替换为 .
ForkJoinPool
ThreadPoolExecutor
ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
-
使用通用的通过直接处理。
ForkJoinPool
Stream
List<Record> output = stream.collect(Collectors.toList());
-
在我打电话后打电话。
parallel()
sorted
Stream<Record> stream = list.stream().sorted().parallel();
-
调用而不是 。
parallelStream()
stream().parallel()
Stream<Record> stream = list.parallelStream().sorted();
-
使用 .请注意,此排序标准与我为接口定义的“自然”顺序不同,尽管从一开始就已经按顺序开始结果,但结果应该仍然相同。
Comparator
Comparable
Stream<Record> stream = list.stream().parallel().sorted( (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) );
我只能得到这个来保持遇到顺序,如果我不做以下之一:Stream
- 不要打电话给 .
parallel()
- 不要调用 的任何重载。
sorted
有趣的是,没有排序的保留了顺序。parallel()
在上述两种情况下,输出为:
Verified: true
我的 Java 版本是 1.8.0_05。这种异常也发生在Ideone上,它似乎正在运行Java 8u25。
更新
在撰写本文时,我已将JDK升级到最新版本1.8.0_45,并且问题没有改变。
问题
结果 () 中的记录顺序是否因为排序某种程度不稳定、因为遇到顺序未保留而无序,还是其他原因?List
output
如何确保在创建并行流并对其进行排序时保留遭遇顺序?