对并行流进行排序时遇到顺序错误

我有一个类:Record

public class Record implements Comparable<Record>
{
   private String myCategory1;
   private int    myCategory2;
   private String myCategory3;
   private String myCategory4;
   private int    myValue1;
   private double myValue2;

   public Record(String category1, int category2, String category3, String category4,
      int value1, double value2)
   {
      myCategory1 = category1;
      myCategory2 = category2;
      myCategory3 = category3;
      myCategory4 = category4;
      myValue1 = value1;
      myValue2 = value2;
   }

   // Getters here
}

我创建了一个包含大量记录的大列表。只有第二个和第五个值和 稍后分别由 getter 和 使用。i / 10000igetCategory2()getValue1()

List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
    list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}

请注意,前 10,000 条记录具有 a 的 ,然后接下来的 10,000 条具有 等,而值按顺序为 0-114999。category201value1

我创建了一个既是和.Streamparallelsorted

Stream<Record> stream = list.stream()
   .parallel()
   .sorted(
       //(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
   )
   //.parallel()
;

我有一个维护线程的,这是我在PC上拥有的内核数。ForkJoinPool8

ForkJoinPool pool = new ForkJoinPool(8);

我使用这里描述的技巧将流处理任务提交到我自己的ForkJoinPool,而不是普通的ForkJoinPool

List<Record> output = pool.submit(() ->
    stream.collect(Collectors.toList()
)).get();

我期望并行操作将遵循流的遭遇顺序,并且它将是一个稳定的排序,因为返回的是 。sortedSpliteratorArrayListORDERED

但是,按顺序打印出结果元素的简单代码表明情况并非如此。Listoutput

for (Record record : output)
{
     System.out.println(record.getValue1());
}

输出,压缩:

0
1
2
3
...
69996
69997
69998
69999
71875  // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000  // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062  // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999

的 is 和所有元素似乎都在那里,只是顺序略有不同。size()output115000

所以我写了一些检查代码,看看它是否稳定。如果它是稳定的,则所有值都应保持原样。此代码验证订单,打印任何差异。sortvalue1

int prev = -1;
boolean verified = true;
for (Record record : output)
{
    int curr = record.getValue1();
    if (prev != -1)
    {
        if (prev + 1 != curr)
        {
            System.out.println("Warning: " + prev + " followed by " + curr + "!");
            verified = false;
        }
    }
    prev = curr;
}
System.out.println("Verified: " + verified);

输出:

Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false

如果我执行以下任一操作,则此情况仍然存在:

  • 将 替换为 .ForkJoinPoolThreadPoolExecutor

    ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
  • 使用通用的通过直接处理。ForkJoinPoolStream

    List<Record> output = stream.collect(Collectors.toList());
    
  • 在我打电话打电话。parallel()sorted

    Stream<Record> stream = list.stream().sorted().parallel();
    
  • 调用而不是 。parallelStream()stream().parallel()

    Stream<Record> stream = list.parallelStream().sorted();
    
  • 使用 .请注意,此排序标准与我为接口定义的“自然”顺序不同,尽管从一开始就已经按顺序开始结果,但结果应该仍然相同。ComparatorComparable

    Stream<Record> stream = list.stream().parallel().sorted(
        (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
    );
    

我只能得到这个来保持遇到顺序,如果我不做以下之一:Stream

  • 不要打电话给 .parallel()
  • 不要调用 的任何重载。sorted

有趣的是,没有排序的保留了顺序。parallel()

在上述两种情况下,输出为:

Verified: true

我的 Java 版本是 1.8.0_05。这种异常也发生在Ideone上,它似乎正在运行Java 8u25。

更新

在撰写本文时,我已将JDK升级到最新版本1.8.0_45,并且问题没有改变。

问题

结果 () 中的记录顺序是否因为排序某种程度不稳定、因为遇到顺序未保留而无序,还是其他原因?Listoutput

如何确保在创建并行流并对其进行排序时保留遭遇顺序?


答案 1

在某些情况下,它看起来不稳定。很好找。流并行排序是按 实现的,因此它也会影响流。下面是一个简化的示例:Arrays.parallelSortArrays.parallelSort

public class StableSortBug {
    static final int SIZE = 50_000;

    static class Record implements Comparable<Record> {
        final int sortVal;
        final int seqNum;

        Record(int i1, int i2) { sortVal = i1; seqNum = i2; }

        @Override
        public int compareTo(Record other) {
            return Integer.compare(this.sortVal, other.sortVal);
        }
    }

    static Record[] genArray() {
        Record[] array = new Record[SIZE];
        Arrays.setAll(array, i -> new Record(i / 10_000, i));
        return array;
    }

    static boolean verify(Record[] array) {
        return IntStream.range(1, array.length)
                        .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);
    }

    public static void main(String[] args) {
        Record[] array = genArray();
        System.out.println(verify(array));
        Arrays.sort(array);
        System.out.println(verify(array));
        Arrays.parallelSort(array);
        System.out.println(verify(array));
    }
}

在我的机器(2 核 x 2 线程)上,这将打印以下内容:

true
true
false

当然,它应该打印三次。这是在当前的 JDK 9 开发版本上。如果到目前为止,它出现在所有JDK 8版本中,我不会感到惊讶,因为你已经尝试过了什么。奇怪的是,减小大小或除数会改变行为。大小为 20,000,除数为 10,000 是稳定的,大小为 50,000 且除数为 1,000 也是稳定的。似乎问题与足够大的值运行有关,这些值与并行拆分大小进行比较。true

OpenJDK 问题 JDK-8076446 涵盖了此错误。


答案 2

推荐