编辑:如果您要使用它,请参阅the_joric的评论。有一个边缘情况没有得到处理,我没有看到快速修复它的方法,所以我现在没有时间修复它。
下面是 C# 中的解决方案,因为你有标记。system.reactive
static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b)
{
var source = Observable.Merge(
a.Select(x => Tuple.Create('a', x)),
b.Select(y => Tuple.Create('b', y)));
return source.Publish(o =>
{
var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2);
var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2);
return Observable.Merge(
published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)),
published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x)));
});
}
这个想法总结如下。
如果您只有热可观察量,则可以执行以下操作。但是,如果混合物中有任何冷可观测物,则以下方法将不起作用。我建议始终使用同时适用于热可观察量和冷可观察量的版本。
static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b)
{
return Observable.Merge(
a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)),
b.Delay(y => a.FirstOrDefaultAsync(x => y <= x)));
}