RxJava:如何用依赖关系编写多个可观察量,并在最后收集所有结果?

我正在学习RxJava,并且作为我的第一个实验,尝试重写此代码中的第一种方法中的代码(在Netflix的博客上引用为RxJava可以帮助解决的问题),以使用RxJava提高其异步性,即这样它就不会等待第一个Future()的结果,然后再继续阅读其余的代码。run()f1.get()

f3取决于。我看到如何处理这个,似乎做了诀窍:f1flatMap

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

接下来,并依赖于 .我有这个:f4f5f2

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

这开始变得奇怪(他们可能不是我想要的......),但允许我在最后这样做,而不是我想要的:merge

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

这给了我:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

这是所有数字,但不幸的是,我在单独的调用中获得结果,所以我不能完全替换原始代码中的最终println:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

我不明白如何在同一行上访问这两个返回值。我认为可能我在这里缺少一些函数式编程fu。我该怎么做?谢谢。


答案 1

看起来你真正需要的只是更多的鼓励和对RX如何使用的看法。我建议您阅读更多文档以及大理石图表(我知道它们并不总是有用的)。我还建议研究函数和运算符。lift()

  • 可观察量的全部意义在于将数据流和数据操作连接到单个对象中。
  • 调用 的点,并且要操作数据流中的数据mapflatMapfilter
  • 合并的目的是合并数据流
  • 运算符的要点是允许您中断稳定的可观察量流,并在数据流上定义自己的操作。例如,我编写了一个移动平均线运算符。这将 s 求和为双精度的可观察量,以返回移动平均线流。代码从字面上看是这样的ndouble

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

您会感到宽慰的是,您认为理所当然的许多过滤方法都在引擎盖下。lift()

话虽如此;合并多个依赖项所需的全部操作是:

  • 使用 或 将所有传入数据更改为标准数据类型mapflatMap
  • 将标准数据类型合并到流中
  • 如果一个对象需要等待另一个对象,或者需要对流中的数据进行排序,请使用自定义运算符。警告:此方法将减慢流速度
  • 用于列出或订阅以收集所有这些数据

答案 2

编辑:有人将以下文本(我已将其添加为对问题的编辑)转换为答案,我对此表示赞赏,并理解这可能是正确的SO操作,但是我不认为这是一个答案,因为它显然不是正确的方法。我永远不会使用这个代码,也不会建议任何人复制它。欢迎其他/更好的解决方案和评论!


我能够通过以下方式解决这个问题。我没有意识到你可以多次观察,我以为结果只能消耗一次。所以我只是f2Observable两次(对不起,自从我的原始帖子以来,我重命名了代码中的一些东西),然后在所有可观察量上,然后订阅它。由于类型杂耍,在 聚合值是不可取的。欢迎其他/更好的解决方案和评论!完整的代码可在要点中查看。谢谢。flatMapflatMapzipMapzip

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
            return Observable.from(f4);
        }       
    });     

Observable<Integer> f5Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
            return Observable.from(f5);
        }       
    });     

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }       
}).subscribe(new Action1<Map<String, String>>() {
    @Override
    public void call(Map<String, String> map) {
        System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
    }       
});     

这为我提供了所需的输出:

responseB_responseA => 714000