如何创建 rx 的缓存/热版本。单?

2022-09-02 10:58:50

RxJava v1.0.13 引入了一种新型的 Observable:rx。单身。它非常适合请求 - 响应模型,但缺乏引入doOnNext()等运算符的标准副作用。因此,因此,让多件事发生要困难得多。

我的想法是将doOnNext()替换为对同一单个实例的多个订阅。但这可能会导致底层工作多次完成:每个订阅一次。

示例 rx.单一实现:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> {
    override fun call(sub: SingleSubscriber<in SomeData>) {
        try {
            val result = fetchSomeData()
            sub.onSuccess(result)
        } catch(t: Throwable) {
            sub.onError(t)
        }
    }
}

val single = Single.create<SomeData>(WorkerSubscribe())

用法:

single.subscribe({}, {})
single.subscribe({}, {})   // Data is fetched for the second time

是否可以创建一个 Single 实例,即使多次调用 single.subscribe() 也不会多次获取SomeData(),但缓存并返回相同的结果?


答案 1

您需要 RxJava Subject:或BehaviorSubjectAsyncSubject


答案 2

我只是需要类似的行为,并找到了一些解决方案。

您可以转换为应用,然后将其转换回 。SingleObservablecache()Single

yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle()

我使用而不是仅仅作为优化 - 永远不会发出多个项目。cacheWithInitialCapacity(1)cache()Single

 

提供实施也是一个好主意Transformer

public class SingleUtils {

    public static <T> Single.Transformer<T, T> cached() {
        return single -> single.toObservable()
            .cacheWithInitialCapacity(1)
            .toSingle();
    }
}

因此,只需调用即可在任何地方使用缓存

yourSingle.compose(SingleUtils.cached())

编辑:从rxJava 1.2.2开始,它已被添加(https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2)

完全以这种方式实现(https://github.com/ReactiveX/RxJava/pull/4757)


推荐