如何处理观察者onNext在RxJava中引发的异常?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

这将输出从 1 到 5 的数字,然后打印异常。

我想实现的是让观察者保持订阅状态,并在抛出异常后继续运行,即打印从1到10的所有数字。

我尝试过使用和其他各种错误处理运算符,但是,正如文档中所说,它们的目的是处理可观察量本身发出的错误。retry()

最直接的解决方案是将整个主体包装成一个尝试捕获块,但这对我来说听起来不是一个好的解决方案。在类似的 Rx.NET 问题中,建议的解决方案是制作一个扩展方法,该方法将通过创建可观察的代理来执行包装。我试图重新制作它:onNext

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

这不会改变任何东西,因为RxJava本身将订阅者包装到SafeSubscriber中。用来绕过它似乎也不是一个好的解决方案。unsafeSubscribe

我能做些什么来解决这个问题?


答案 1

这是学习Rx时出现的常见问题。

TL;DR

将异常处理逻辑放在订阅服务器中的建议比创建通用可观察包装器更可取。

解释

请记住,Rx 是关于将事件推送给订阅者。

从可观察的界面来看,很明显,除了他们处理事件所花费的时间或任何抛出的异常中包含的信息之外,可观察对象实际上无法知道有关其订阅者的任何信息。

使用通用包装器来处理订阅者异常并继续向该订阅者发送事件是一个坏主意。

为什么?好吧,可观察对象应该只真正知道订阅者现在处于未知的故障状态。在这种情况下继续发送事件是不明智的 - 例如,订阅者处于这样一种情况,即从此时开始的每个事件都会引发异常并需要一段时间才能完成。

一旦订阅者引发异常,对于可观察对象,只有两种可行的操作过程:

  • 重新引发异常
  • 实现通用处理以记录故障并停止向其发送事件(任何类型的),并清理由于该订阅服务器而产生的任何资源,并继续使用任何剩余的订阅。

对订户异常的具体处理将是一个糟糕的设计选择;它会在订阅者和可观察者之间产生不适当的行为耦合。因此,如果你想对不良订阅者有弹性,上面的两个选择实际上是可观察本身责任的合理限制。

如果您希望订阅者具有弹性并继续进行,那么您绝对应该将其包装在异常处理逻辑中,该逻辑旨在处理您知道如何从中恢复的特定异常(也许还可以处理暂时性异常,日志记录,重试逻辑,断路等)。

只有订阅者本身才有上下文来了解它是否适合在发生故障时接收进一步的事件。

如果您的情况需要开发可重用的错误处理逻辑,请将自己置于包装观察者的事件处理程序而不是可观察的事件处理程序的思维方式中 - 并且请注意不要在面对故障时盲目地传输事件。释放它!虽然没有写过关于Rx的文章,但这是一个有趣的软件工程经典,在最后一点上有很多话要说。如果你还没有读过它,我强烈建议它。


答案 2

推荐