Concat VS Merge 运算符

2022-08-31 17:06:33

我正在检查RXJava的文档,我注意到concat和merge运算符似乎也是如此。我写了几个测试来确保。

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}

文档说

合并运算符也类似。它结合了两个或多个可观察量的发射,但可以交错它们,而Concat从不交错来自多个可观察量的发射。

但是我仍然不完全理解,运行此测试数千次合并结果始终是相同的。由于订单未被授予,因此我有时期望“反应性”“世界”“你好”。

代码在这里 https://github.com/politrons/reactive


答案 1

正如您引用的文档中所述 - 合并可以交错输出,而concat将首先等待早期的流完成,然后再处理以后的流。在你的例子中,对于单元素的静态流,它没有任何真正的区别(但从理论上讲,合并可以随机顺序输出单词,并且根据规范仍然有效)。如果你想看到区别,请尝试遵循(之后你需要增加一些睡眠,以避免提前退出)

    Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat 永远不会开始打印 B,因为流 A 永远不会完成。

s/stream/observable/g ;)

文档提供了很好的图表来显示差异。您需要记住,合并并不能保证逐个交错项,它只是一个可能的示例之一。

康卡特

Concat operator 合并

Merge operator


答案 2

康卡特

Concat 发出来自两个或多个可观察量的发射,而不交错它们。它将在发出项时保持可观察量的顺序。这意味着它将发出第一个可观察量的所有项目,然后它将发出第二个可观察对象的所有项目,依此类推。

Concat Operator

让我们通过一个例子清楚地理解它。

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

由于我们使用 Concat 运算符,它将保持顺序并将值发出为 A1、A2、A3、A4、B1、B2、B3。

合并

合并 通过将多个可观察量的发射合并为一个。它不会在发出物品时保持订单。

Merge Operator

让我们通过一个例子清楚地理解它。

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

由于我们使用Merge Operator,因此它不会保持顺序,并且可以按任何顺序发出值,例如A1,B1,A2,A3,B2,B3,A4A1,A2,B1,B2,A3,A4,B3或任何东西。

这就是我们应该在RxJava中使用Concat和Merge运算符的方式,具体取决于我们的用例。


推荐