如何处理观察者onNext在RxJava中引发的异常?
请考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
这将输出从 1 到 5 的数字,然后打印异常。
我想实现的是让观察者保持订阅状态,并在抛出异常后继续运行,即打印从1到10的所有数字。
我尝试过使用和其他各种错误处理运算符,但是,正如文档中所说,它们的目的是处理可观察量本身发出的错误。retry()
最直接的解决方案是将整个主体包装成一个尝试捕获块,但这对我来说听起来不是一个好的解决方案。在类似的 Rx.NET 问题中,建议的解决方案是制作一个扩展方法,该方法将通过创建可观察的代理来执行包装。我试图重新制作它:onNext
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
这不会改变任何东西,因为RxJava本身将订阅者包装到SafeSubscriber中
。用来绕过它似乎也不是一个好的解决方案。unsafeSubscribe
我能做些什么来解决这个问题?