RxJava2 可观察采取抛出无法交付异常

2022-09-01 02:24:19

据我所知,RxJava2创建了另一个可观察对象,它只包含原始可观察量中的一个元素。这不能引发异常,因为它被过滤掉,因为它发生了第二次。values.take(1)take(1)

以下代码片段所示

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

输出

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

我的问题 :

  1. 我的理解是否正确?
  2. 导致异常的真实情况。
  3. 如何从消费者那里解决这个问题?

答案 1
  1. 是的,但是因为可观察的“end”并不意味着在其中运行的代码停止。在这种情况下,为了完全安全,您需要使用以查看可观察量是否已在下游结束。create(...)o.isDisposed()
  2. 之所以有例外,是因为 RxJava 2 具有永不允许调用丢失的策略。它要么在下游传递,要么在可观察量已终止时作为全局抛出。由可观察量的创建者“正确”处理可观察量已经结束并且发生异常的情况。onErrorUndeliverableException
  3. 问题在于生产者()和消费者()在流何时结束上存在分歧。由于在这种情况下生产者比消费者长寿,因此问题只能在生产者中解决。ObservableSubscriber

答案 2

@Kiskae在前面的评论中正确回答了发生此类异常的原因。

这里有关于这个主题的官方文档的链接:RxJava2-wiki

有时你无法改变这种行为,所以有一种方法可以处理这个问题。以下是有关如何避免崩溃和不当行为的代码片段:UndeliverableException

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

此代码取自上面的链接。

重要提示。此方法将全局错误处理程序设置为 RxJava,因此,如果可以摆脱这些异常,这将是更好的选择。


推荐