了解冷与热可观察量非常重要。如果您的可观察量是冷的,那么如果没有订阅者,则不会执行它们的操作。因此,要“取消”,只需确保所有观察者取消订阅源可观察。
但是,如果源的一个观察者取消订阅,并且仍有其他观察者订阅该源,则不会导致“取消”。在这种情况下,您可以使用(但这不是唯一的解决方案)ConnectableObservables。另请参阅有关 Rx.NET 的链接。
使用ConnectableObservables的一种实用方法是简单地调用任何冷的Observable。这样做是创建一个单一的“代理”观察者,它将事件从源中继到实际的观察者。代理观察者在最后一个实际观察者取消订阅时取消订阅。.publish().refCount()
要手动控制 ConnectableObservable,请调用 just,您将获得一个 ConnectableObservable 的实例。然后,您可以调用这将返回“代理”观察者的订阅。要手动“取消”源,您只需取消订阅代理观察者的订阅即可。coldSource.publish()
.connect()
对于您的特定问题,您还可以使用运算符。.takeUntil()
假设您的“最终未来”像在RxJava中一样被移植,并且假设“取消事件”是可观察量,等,那么“取消”操作变得相当简单:finalStream
cancelStream1
cancelStream2
finalStream
Observable<FooBar> finalAndCancelableStream = finalStream
.takeUntil( Observable.merge(cancelStream1, cancelStream2) );
在图中,takeUntil 就是这样工作的,这就是 merge 的工作原理。
用简单的英语,你可以把它读成“finalAndCancelableStream是finalStream,直到 cancelStream1 或 cancelStream2 发出一个事件”。