Observable 的 doOnror 正确位置

2022-09-03 05:00:22

我有点新手,我仍然在努力弄清楚它们。我有以下代码:Observers

observableKafka.getRealTimeEvents()
        .filter(this::isTrackedAccount)
        .filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
        .map(ledgerMapper::mapLedgerTransaction)
        .map(offerCache::addTransaction)
        .filter(offer -> offer != null)  // Offer may have been removed from cache since last check
        .filter(Offer::isReady)
        .doOnError(throwable -> { 
              LOG.info("Exception thrown on realtime events");
          })
        .forEach(awardChecker::awardFailOrIgnore);

getRealTimeEvents()返回 .Observable<Event>

位置重要吗?另外,在这段代码中添加多个调用有什么影响?我已经意识到我可以做到这一点,所有这些都被调用,但我不确定它的目的是什么。.doOnError


答案 1

是的,确实如此。 当错误在该特定点通过流时执行操作,因此,如果抛出之前的运算符,则将调用您的操作。但是,如果您将进一步向上放置,则可能会或可能不会调用它,具体取决于链中的下游运营商。doOnErrordoOnErrordoOnError

鉴于

Observer<Object> ignore = new Observer<Object>() {
    @Override public void onCompleted() {
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(Object t) {
    }
};

例如,下面的代码将始终调用 doOnError:

Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);

但是,此代码不会:

Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);

大多数运营商会退回源自下游的异常。

如果通过 或 转换异常,则添加多重是可行的:doOnErroronErrorResumeNextonExceptionResumeNext

Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);

否则,您将在链的多个位置记录相同的异常。


答案 2

这些方法是为了副作用而存在的,比如说,处理并不是你的核心业务价值。日志记录是一个非常好的用途。也就是说,有时您希望对错误执行更有意义的操作,例如重试或向用户显示消息等。对于这些情况,“rx”方法是在调用中处理错误。doOn???subscribe

doOnError(和其他方法)将原始方法包装成一个新方法,并向其添加行为(显然是围绕其方法)。这就是为什么你可以多次调用它。能够在链中的任何位置调用它的另一个好处是,您可以访问否则会对流的使用者隐藏的错误(例如),因为链中有重试...doOnObservableonErrorSubscriber


推荐