RxJava 和并行执行观察器代码

2022-09-01 03:30:37

我有以下代码使用RxJava Observable api:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });

我的期望是,在我指定计算调度程序后,观察代码即subse()方法中的代码将并行执行。相反,代码仍在单线程上按顺序执行。如何使用 RxJava api 使代码并行运行。


答案 1

RxJava在涉及它的异步/多线程方面时经常被误解。多线程操作的编码很简单,但理解抽象是另一回事。

关于 RxJava 的一个常见问题是如何实现并行化,或者如何从可观察量同时发出多个项目。当然,这个定义打破了可观察契约,该契约规定onNext()必须按顺序调用,并且决不能一次由多个线程并发调用。

要实现并行性,您需要多个可观察量。

这在单个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

代码和文本来自这篇博客文章。


答案 2

RxJava 2.0.5 引入了并行流ParallelFlowable,这使得并行执行更简单,更具声明性。

您不再需要创建/在内,您可以简单地调用它,它就会返回。ObservableFlowableflatMapparallel()FlowableParallelFlowable

它不像常规 ,因为并发性会给Rx合约带来许多问题,但是您有基本的,还有更多,这在大多数情况下应该足够了。Flowablemap()filter()

因此,与其说这是来自@LordRaydenMK答案:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
        .subscribeOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
    ).subscribe(val -> System.out.println(val));

现在,您可以执行以下操作:

Flowable<Integer> vals = Flowable.range(1, 10);

vals.parallel()
        .runOn(Schedulers.computation())
        .map(i -> intenseCalculation(i))
        .sequential()
        .subscribe(val -> System.out.println(val));

推荐