按鉴别器函数对流进行分区

2022-09-01 19:18:20

Streams API中缺少的功能之一是“分区依据”转换,例如Clojure中定义的转换。假设我想重现Hibernate的抓取联接:我想发出一个SQL SELECT语句来从结果中接收这种对象:

class Family {
   String surname;
   List<String> members;
}

我问题:

SELECT f.name, m.name 
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name

我检索了一个平坦的记录流。现在,我需要将其转换为对象流,其中包含其成员列表。假设我已经有一个 ;现在我需要将其转换为 a,然后通过映射转换将其转换为 .(f.name, m.name)FamilyStream<ResultRow>Stream<List<ResultRow>>Stream<Family>

转换的语义如下:只要提供的鉴别器函数继续返回相同的值,就继续将流收集到 a 中;一旦值更改,就将 作为输出流的元素发出,并开始收集新的 .ListListList

我希望能够编写这种代码(我已经有了方法):resultStream

Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
        "SELECT f.name, m.name"
      + " FROM Family f JOIN Member m on m.family_id = f.id"
      + " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
                    Family f = new Family(rs.get(0).string(0));
                    f.members = rs.stream().map(r -> r.string(1)).collect(toList());
                    return f;
                 });

毋庸置疑,我希望生成的流保持懒惰(非具体化),因为我希望能够在不达到任何O(n)内存限制的情况下处理任何大小的结果集。如果没有这个关键要求,我会对提供的收藏家感到满意。groupingBy


答案 1

该解决方案要求我们定义一个可用于构造分区流的自定义。我们需要通过输入流自己的拆分器访问输入流,并将其包装到我们的拆分器中。然后从我们的自定义拆分器构造输出流。Spliterator

下面的拆分器会将 any 变成 a 作为鉴别器函数。请注意,必须对输入流进行排序,此操作才有意义。Stream<E>Stream<List<E>>Function<E, ?>

import java.util.*;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Comparator.naturalOrder;

public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
    private final Spliterator<E> spliterator;
    private final Function<? super E, ?> partitionBy;
    private HoldingConsumer<E> holder;
    private Comparator<List<E>> comparator;

    public PartitionBySpliterator(
            Spliterator<E> toWrap,
            Function<? super E, ?> partitionBy
    ) {
        super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
        this.spliterator = toWrap;
        this.partitionBy = partitionBy;
    }

    public static <E> Stream<List<E>> partitionBy(
            Function<E, ?> partitionBy, Stream<E> in
    ) {
        return StreamSupport.stream(
                new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final HoldingConsumer<E> h;
        if (holder == null) {
            h = new HoldingConsumer<>();
            if (!spliterator.tryAdvance(h)) {
              return false;
            }
            holder = h;
        } else {
          h = holder;
        }
        final ArrayList<E> partition = new ArrayList<>();
        final Object partitionKey = partitionBy.apply(h.value);
        boolean didAdvance;
        do {
          partition.add(h.value);
        }
        while ((didAdvance = spliterator.tryAdvance(h))
                && Objects.equals(partitionBy.apply(h.value), partitionKey));
        if (!didAdvance) {
          holder = null;
        }
        action.accept(partition);
        return true;
    }

    static final class HoldingConsumer<T> implements Consumer<T> {
        T value;

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    @Override
    public Comparator<? super List<E>> getComparator() {
        final Comparator<List<E>> c = this.comparator;
        return c != null ? c : (this.comparator = comparator());
    }

    private Comparator<List<E>> comparator() {
        @SuppressWarnings({"unchecked", "rawtypes"})
        final Comparator<? super E> innerComparator =
                Optional.ofNullable(spliterator.getComparator())
                        .orElse((Comparator) naturalOrder());
        return (left, right) -> {
            final int c = innerComparator.compare(left.get(0), right.get(0));
            return c != 0 ? c : innerComparator.compare(
                    left.get(left.size() - 1), right.get(right.size() - 1));
        };
    }
}

答案 2

对于那些只想对流进行分区的人来说,有映射器和收集器。

class Person {

    String surname;
    String forename;

    public Person(String surname, String forename) {
        this.surname = surname;
        this.forename = forename;
    }

    @Override
    public String toString() {
        return forename;
    }

}

class Family {

    String surname;
    List<Person> members;

    public Family(String surname, List<Person> members) {
        this.surname = surname;
        this.members = members;
    }

    @Override
    public String toString() {
        return "Family{" + "surname=" + surname + ", members=" + members + '}';
    }

}

private void test() {
    String[][] data = {
        {"Kray", "Ronald"},
        {"Kray", "Reginald"},
        {"Dors", "Diana"},};
    // Their families.
    Stream<Family> families = Arrays.stream(data)
            // Build people
            .map(a -> new Person(a[0], a[1]))
            // Collect into a Map<String,List<Person>> as families
            .collect(Collectors.groupingBy(p -> p.surname))
            // Convert them to families.
            .entrySet().stream()
            .map(p -> new Family(p.getKey(), p.getValue()));
    families.forEach(f -> System.out.println(f));
}

推荐