RX:并行运行压缩可观察量?

2022-09-03 12:33:46

所以我正在玩RX(真的很酷),我一直在转换我的api,它访问Android中的sqlite数据库以返回可观察量。

因此,自然而然地,我开始尝试解决的问题之一是,“如果我想进行3次API调用,获得结果,然后在它们全部完成后进行一些处理,该怎么办?

我花了一两个小时,但我最终找到了Zip功能,它能帮我轻松解决:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

伟大!所以这很酷。

因此,当我压缩3个可观察量时,它们会连续运行。如果我希望它们同时并行运行,以便最终更快地获得结果,该怎么办?我尝试了一些东西,甚至尝试阅读人们用C#编写的一些原始RX内容。我相信有一个简单的答案。任何人都可以为我指出正确的方向吗?正确的方法是什么?


答案 1

zip 并行运行可观察量 - 但它也按顺序订阅它们。由于您正在以订阅方法完成,因此给人的印象是连续运行,但实际上没有这样的限制。getNumberedObservable

您可以尝试使用一些长时间运行的可观察量,这些可观察量的寿命超过其订阅逻辑,例如 ,或者使用该方法异步订阅传递给 的每个流。timersubscribeOnzip


答案 2

在 RxJava 中,使用 toAsync 将常规函数转换为将在线程上运行并以可观察结果返回其结果的内容。

我不太了解Java语法,但它看起来像这样:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

如果真的访问数据库,这将起作用。调用它时,将返回一个可观察量,当您订阅它时,该可观察量将在单独的线程上运行。getNumbergetNumberedObservablegetNumber


推荐