RxJava:链接可观察量

2022-09-01 01:03:27

是否有可能使用 RxJava 实现类似下一个链接的内容:

loginObservable()
   .then( (someData) -> {
      // returns another Observable<T> with some long operation
      return fetchUserDataObservable(someData);

   }).then( (userData) -> {
      // it should be called when fetching user data completed (with userData of type T)
      cacheUserData(userData);

   }).then( (userData) -> {
      // it should be called after all previous operations completed
      displayUserData()

   }).doOnError( (error) -> {
      //do something
   })

我发现这个库非常有趣,但无法弄清楚我们如何将彼此依赖的请求链接在以前的位置。


答案 1

当然,RxJava支持这样做。来自 RxJava Wiki:.map

map

基本上,它将是:

loginObservable()
   .switchMap( someData -> fetchUserDataObservable(someData) )
   .map( userData -> cacheUserData(userData) )
   .subscribe(new Subscriber<YourResult>() {
        @Override
        public void onCompleted() {
           // observable stream has ended - no more logins possible
        }
        @Override
        public void onError(Throwable e) {
            // do something
        }
        @Override
        public void onNext(YourType yourType) {
            displayUserData();
        }
    });

答案 2

这是谷歌搜索RxJava链可观察量时的顶部帖子,所以我将添加另一个常见情况,您不想转换收到的数据,而是将其与另一个操作(例如,将数据设置为数据库)链接在一起。用。下面是一个示例:.flatmap()

mDataManager
    .fetchQuotesFromApi(limit)
    .subscribeOn(mSchedulerProvider.io())
    .observeOn(mSchedulerProvider.ui())
    // OnErrorResumeNext and Observable.error() would propagate the error to
    // the next level. So, whatever error occurs here, would get passed to
    // onError() on the UI side.
    .onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
    .flatMap { t: List<Quote> ->
        // Chain observable as such
        mDataManager.setQuotesToDb(t).subscribe(
            {},
            { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
            { d { "Done server set" } }
        )
        Observable.just(t)
    }
    .subscribeBy(
        onNext = {},
        onError = { mvpView?.showError("No internet connection") },
        onComplete = { d { "onComplete(): done with fetching quotes from api" } }
    )

这是RxKotlin2,但这个想法与RxJava和RxJava2相同:

快速说明:

  • 我们尝试从 API 中获取一些数据(在本例中为引号),mDataManager.fetchQuotesFromApi()
  • 我们订阅可观察量以在线程上执行操作并在线程上显示结果。.io().ui()
  • onErrorResumeNext()确保在此方法中捕获我们在获取数据时遇到的任何错误。我想在出现错误时终止整个链,所以我返回一个Observable.error()
  • .flatmap()是链接部分。我希望能够将从API获得的任何数据设置为数据库。我不是在转换我使用接收的数据,我只是在用这些数据做其他事情而不转换它。.map()
  • 我订阅了最后一个可观察量链。如果在获取数据时发生错误(第一个可观察),则将使用onError()onErrorResumeNext()
  • 我非常清楚我正在订阅数据库可观察性(内部)。通过此可观察量发生的任何错误都不会传播到最后一个方法,因为它是在链内的方法内部处理的。flatmap()subscribeBy()subscribe().flatmap()

代码来自这个项目,它位于这里:https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt