任务取消在 RxJava 中是如何工作的?

2022-09-04 02:47:42

我不清楚如何在RXJava中实现任务取消。

我有兴趣移植使用Guava构建的现有API。我的用例如下:ListenableFuture

  • 我有一个单一的操作,由一系列期货组成,由Futures.transform()
  • 多个订阅者观察操作的最终未来。
  • 每个观察者可以取消最终的未来,并且所有观察者都会见证取消事件。
  • 取消最终的未来会导致取消其依赖关系,例如,在序列 ->->中,取消将传播到 ,等等。12332

在RxJava维基中关于这个的信息很少;我能找到的唯一参考资料取消提及为等同于.NET的,但据我所知,订阅仅提供取消订阅序列中后续值的功能。SubscriptionDisposable

我不清楚如何通过这个API实现“任何订阅者都可以取消”语义。我是否以错误的方式思考这个问题?

任何意见将不胜感激。


答案 1

了解冷与热可观察量非常重要。如果您的可观察量是冷的,那么如果没有订阅者,则不会执行它们的操作。因此,要“取消”,只需确保所有观察者取消订阅源可观察。

但是,如果源的一个观察者取消订阅,并且仍有其他观察者订阅该源,则不会导致“取消”。在这种情况下,您可以使用(但这不是唯一的解决方案)ConnectableObservables。另请参阅有关 Rx.NET 的链接

使用ConnectableObservables的一种实用方法是简单地调用任何冷的Observable。这样做是创建一个单一的“代理”观察者,它将事件从源中继到实际的观察者。代理观察者在最后一个实际观察者取消订阅时取消订阅。.publish().refCount()

要手动控制 ConnectableObservable,请调用 just,您将获得一个 ConnectableObservable 的实例。然后,您可以调用这将返回“代理”观察者的订阅。要手动“取消”源,您只需取消订阅代理观察者的订阅即可。coldSource.publish().connect()


对于您的特定问题,您还可以使用运算符。.takeUntil()

假设您的“最终未来”像在RxJava中一样被移植,并且假设“取消事件”是可观察量,等,那么“取消”操作变得相当简单:finalStreamcancelStream1cancelStream2finalStream

Observable<FooBar> finalAndCancelableStream = finalStream
    .takeUntil( Observable.merge(cancelStream1, cancelStream2) );

在图中,takeUntil 就是这样工作的这就是 merge 的工作原理

用简单的英语,你可以把它读成“finalAndCancelableStream是finalStream,直到 cancelStream1 或 cancelStream2 发出一个事件”。


答案 2

推荐