RxJava 合并请求序列

问题

我有两个 Api。Api 1 为我提供了一个项目列表,Api 2 为我提供了从 Api 1 获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能不佳。

问题

在Retrofit和RxJava的帮助下,有效地快速解决了这个问题。

我的方法

目前,我的解决方案如下所示:

步骤 1:从 Api 1 执行改造。Single<ArrayList<Information>>

步骤 2:我循环访问此项,并向 Api 2 发出每个项的请求。

步骤 3:改造返回 按顺序执行每个项目Single<ExtendedInformation>

步骤 4:在 API 2 的所有调用完全执行后,我为所有项目创建一个新对象,其中包含信息和扩展信息。

我的代码

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

答案 1

tl;dr 使用或异步执行子调用或在调度程序上执行子调用。concatMapEagerflatMap


说来话长

我不是Android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2)。

如果我得到正确的图片,所需的流程是:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

假设改造生成了

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

如果项目的顺序不重要,则可以仅使用:flatMap

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

前提是改造建筑商配置了

  • 使用异步适配器(调用将在 okhttp 内部执行器中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行器。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    
  • 或者使用基于调度程序的适配器(调用将在 RxJava 调度程序上调度)。这是我的首选选项,因为您明确选择使用哪个调度程序,它很可能是IO调度程序,但您可以自由尝试其他调度程序。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

原因是将订阅由 创建的每个可观察量,并将它们合并到生成的可观察量中。因此,结果将按收到的顺序显示。flatMapapi2.extendedInfo(...)

如果改造客户端设置为异步或设置为在调度程序上运行,则可以设置一个:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

这个结构几乎与前一个结构完全相同,它指示每个结构应该在哪个调度程序上运行。api2.extendedInfo

可以调整 的参数以控制要同时执行的请求数。虽然我会对此持谨慎态度,但您不希望同时运行所有查询。通常默认值足够好()。maxConcurrencyflatMapmaxConcurrency128

现在,如果原始查询的顺序很重要。 通常是按顺序但按顺序执行相同操作的运算符,如果代码需要等待执行所有子查询,则结果会很慢。不过,解决方案更进一步,这个解决方案将按顺序订阅可观察性,并根据需要缓冲结果。concatMapflatMapconcatMapEager

假设改造客户端是异步的或在特定的调度程序上运行的:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

或者,如果必须在本地设置调度程序:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

还可以在此运算符中调整并发性。


此外,如果 Api 正在返回,则可以在 RxJava 2.1.7 中使用目前仍处于测试阶段的 Api。但是结果不按顺序排列,我不知道一种方法(还?)在不排序的情况下对它们进行排序。Flowable.parallel

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>

答案 2

该运算符旨在满足这些类型的工作流程。flatMap

我将用一个简单的五步示例概述广泛的笔触。希望你能在代码中轻松重构相同的原则:

@Test fun flatMapExample() {
    // (1) constructing a fake stream that emits a list of values
    Observable.just(listOf(1, 2, 3, 4, 5))
            // (2) convert our List emission into a stream of its constituent values 
            .flatMap { numbers -> Observable.fromIterable(numbers) }
            // (3) subsequently convert each individual value emission into an Observable of some 
            //     newly calculated type
            .flatMap { number ->
                when(number) {
                       1 -> Observable.just("A1")
                       2 -> Observable.just("B2")
                       3 -> Observable.just("C3")
                       4 -> Observable.just("D4")
                       5 -> Observable.just("E5")
                    else -> throw RuntimeException("Unexpected value for number [$number]")
                }
            }
            // (4) collect all the final emissions into a list
            .toList()
            .subscribeBy(
                    onSuccess = {
                        // (5) handle all the combined results (in list form) here
                        println("## onNext($it)")
                    },
                    onError = { error ->
                        println("## onError(${error.message})")
                    }
            )
}

(顺便说一句,如果排放的顺序很重要,请改为使用)。concatMap

我希望这有帮助。


推荐