合并可观察量列表并等待所有可观察量完成

TL;DR如何转换为?Task.whenAll(List<Task>)RxJava

我的现有代码使用 Bolts 来构建异步任务列表,并等到所有这些任务完成后再执行其他步骤。从本质上讲,它构建了一个并返回一个在列表中的所有任务完成时标记为已完成的单个任务,如 Bolts 站点上的示例所示。List<Task>Task

我希望替换为,并且我假设这种方法可以构建异步任务列表(大小事先不知道)并将它们全部包装成单个任务,但我不知道如何。BoltsRxJavaObservable

我试过看,等...但是无法完成我正在构建的工作,因为如果我正确理解文档,它们似乎都适合一次只处理两个文档。mergezipconcatList<Observable>Observables

我正在尝试学习,并且仍然很陌生,所以如果这是一个明显的问题或在某个地方的文档中解释,请原谅我;我尝试过搜索。任何帮助将不胜感激。RxJava


答案 1

如果您有动态任务组合,则可以使用。像这样:flatMap

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

并行执行的另一个很好的例子

注意:我真的不知道您对错误处理的要求。例如,如果只有一个任务失败,该怎么办。我认为您应该验证此方案。


答案 2

听起来您正在寻找Zip运算符

有几种不同的使用方法,所以让我们看一个示例。假设我们有一些不同类型的简单可观察量:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

等待它们的最简单方法是这样的:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

请注意,在 zip 函数中,参数具有与要压缩的可观察量类型相对应的具体类型。

也可以直接压缩可观察量列表:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...或者通过将列表包装成 :Observable<Observable<?>>

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

但是,在这两种情况下,zip 函数只能接受单个参数,因为列表中可观察量的类型及其数量都不知道。这意味着 zip 函数必须检查参数的数量并相应地强制转换它们。Object[]

无论如何,上述所有示例最终都会打印出来。1 Blah true

编辑:使用 Zip 时,请确保所有被压缩的都发出相同数量的项目。在上面的示例中,所有三个可观察量都发出了一个项目。如果我们要将它们更改为如下所示:Observables

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

然后,这将是传递到 zip 函数中的唯一项目。由于其他可观察量已完成,因此永远不会压缩该项。1, Blah, True2, Hello, True3


推荐