如何将两个排序的可观察量合并为一个排序的可观察量?

2022-09-03 02:35:30

鉴于:

Integer[] arr1 = {1, 5, 9, 17};
Integer[] arr2 = {1, 2, 3, 6, 7, 12, 15};
Observable<Integer> o1 = Observable.from(arr1);
Observable<Integer> o2 = Observable.from(arr2);

如何获取包含 的可观察量?1, 1, 2, 3, 5, 6, 7, 9, 12, 15, 17


答案 1

您可以合并、排序和拼合序列,但这会产生很大的开销:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...)

否则,您需要编写一个相当复杂的运算符。

编辑 04/06/2015:

下面是一个运算符,可以更有效地执行此排序合并。


答案 2

编辑:如果您要使用它,请参阅the_joric的评论。有一个边缘情况没有得到处理,我没有看到快速修复它的方法,所以我现在没有时间修复它。

下面是 C# 中的解决方案,因为你有标记。system.reactive

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
    var source = Observable.Merge(
        a.Select(x => Tuple.Create('a', x)),
        b.Select(y => Tuple.Create('b', y)));
    return source.Publish(o =>
    {
        var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
        var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
        return Observable.Merge(
            published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
            published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
    });
}

这个想法总结如下。

  • 当发出该值时,我们将其延迟,直到发出这样的值。axbyx <= y

  • 当发出该值时,我们将其延迟,直到发出这样的值。byaxy <= x

如果您只有热可观察量,则可以执行以下操作。但是,如果混合物中有任何冷可观测物,则以下方法将不起作用。我建议始终使用同时适用于热可观察量和冷可观察量的版本。

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
    return Observable.Merge(
        a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
        b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}

推荐