如何使这个rxjava zip并行运行?

我有一个睡眠方法来模拟一个长时间运行的过程。

private void sleep() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

然后我有一个方法返回一个可观察量,其中包含参数中给定的2个字符串的列表。它在返回字符串之前调用睡眠。

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            sleep();
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}

然后我在 Observalb 中调用 getStrings 三次.zip,我希望这三个调用并行运行,所以执行的总时间应该在 2 秒以内,或者最多 3 秒,因为睡眠时间只有 2 秒。但是,它总共需要秒钟。如何使它并行运行,以便在2秒内完成?

Observable
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(List<String> strings) {
        //Display the strings
    }
});

mergeStringLists 方法

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Func3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d(TAG, "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}

答案 1

发生这种情况是因为订阅可观察对象发生在同一线程中。zippedio

你为什么不试试这个:

Observable
    .zip(
        getStrings("One", "Two")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Three", "Four")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Five", "Six")
            .subscribeOn(Schedulers.newThread()),
        mergeStringLists())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<String>>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(List<String> strings) {
            //Display the strings
        }
    });

让我知道这是否有帮助


答案 2

在这里,我有一个以异步方式使用Zip的示例,以防万一您感到好奇

/**
 * Since every observable into the zip is created to                 subscribeOn a diferent thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
 */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2).concat(s3))
            .subscribe(result -> showResult("Async in:", start, result));
}

public Observable<String> obAsyncString() {
    return Observable.just("")
            .observeOn(scheduler)
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
            .observeOn(scheduler1)
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
            .observeOn(scheduler2)
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> "!");
}

您可以在此处查看更多示例 https://github.com/politrons/reactive


推荐