Rx Java mergeDelayError 未按预期工作

2022-09-02 13:14:43

我正在使用RxJava和Android应用程序与RxAndroid。我正在使用mergeDelayError将两个改造网络调用组合成一个可观察的,如果任何一个发出一个,它将处理发出的项目,如果任何一个有一个,则处理错误。这不起作用,并且仅在遇到错误时才触发 onError 操作。现在为了测试这一点,我转移到一个非常简单的例子,当我有一个onError调用时,仍然永远不会调用成功操作。请参阅下面的示例。

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);

仅当我使用两个成功可观察量时,才会调用成功操作。我是否遗漏了合并延迟错误应该如何工作的东西?

编辑:

我发现,如果我删除,一切都按预期工作。我需要指定线程,并认为这是使用Rx的全部意义。任何想法,为什么指定这些会破坏行为?observeOnsubscribeOnSchedulers


答案 1

使用 .observeOn(AndroidSchedulers.mainThread(), true) 而不是 .observeOn(AndroidSchedulers.mainThread()

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }

以上是 observeOn 函数的签名。以下代码有效。

  Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });

从ConcatDelayError线程中得到这个技巧:https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009


答案 2

这似乎仍然是mergeDelayError运算符中的一个错误,但我能够通过为每个可观察量复制observerOn和Subease来使其工作。

Observable.mergeDelayError(
            Observable.error(new RuntimeException())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io()),
            Observable.just("Hello")
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io())
        )
        .finallyDo(completeAction)
        .subscribe(successAction, errorAction);

推荐