是否可以将流拆分为两个流?

2022-08-31 06:35:13

我有一个由Java 8流表示的数据集:

Stream<T> stream = ...;

我可以看到如何过滤它以获得随机子集 - 例如

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

我还可以看到如何减少此流,例如,获取表示数据集的两个随机一半的列表,然后将其转换回流。但是,有没有一种直接的方法从最初的流生成两个流?类似的东西

(heads, tails) = stream.[some kind of split based on filter]

感谢您的任何见解。


答案 1

收集器可用于此目的。

  • 对于两个类别,请使用工厂。Collectors.partitioningBy()

这将创建一个 ,并根据 .Map<Boolean, List>Predicate

注意:由于流需要全部使用,因此这不适用于无限流。而且,由于流无论如何都会被消耗,因此此方法只是将它们放在列表中,而不是创建新的带内存的流。如果需要流作为输出,则始终可以流式传输这些列表。

此外,不需要迭代器,甚至在您提供的仅头部示例中也不需要迭代器。

  • 二进制拆分如下所示:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • 有关更多类别,请使用工厂。Collectors.groupingBy()
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

如果流不是 ,而是像 之一这样的原始流,那么此方法不可用。您必须在没有收集器工厂的情况下以手动方式执行此操作。它的实现如下所示:StreamIntStream.collect(Collectors)

[例 2.0 自 2020-04-16 起]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

在此示例中,我使用初始集合的完整大小初始化 ArrayLists(如果这是已知的)。即使在最坏的情况下,这也可以防止调整事件的大小,但可能会吞噬2NT空间(N = 初始元素数,T = 线程数)。为了在空间上换取速度,你可以把它排除在外,或者使用你最好的猜测,比如一个分区中预期的最多元素数(对于平衡的拆分,通常刚好超过N/2)。

我希望我不会因为使用Java 9方法而冒犯任何人。对于 Java 8 版本,请查看编辑历史记录。


答案 2

我偶然发现了这个问题,我觉得分叉流有一些用例可以证明是有效的。我作为消费者编写了下面的代码,这样它就不会做任何事情,但你可以把它应用到函数和你可能遇到的任何其他事情上。

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

现在,您的代码实现可能如下所示:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));

推荐