RxJava:如何在不中断链的情况下有条件地将运算符应用于可观察量

2022-09-03 06:33:58

我在RxJava可观察量上有一个运算符链。我希望能够根据布尔值应用两个运算符之一,而不会“破坏链”。

我对Rx(Java)相对较新,我觉得可能有一种比我目前引入临时变量的方法更习惯和可读的方法。

下面是一个具体示例,如果批大小字段为非 null,则从可观察对象中缓冲项目,否则发出一批无界大小::toList()

Observable<Item> source = Observable.from(newItems);
Observable<List<Item>> batchedSource = batchSize == null ?
                source.toList() :
                source.buffer(batchSize);
return batchedSource.flatMap(...).map(...)

这样的事情可能吗?(伪 lambdas,因为 Java):

Observable.from(newItems)
    .applyIf(batchSize == null,
             { o.toList() },
             { o.buffer(batchSize) })
    .flatMap(...).map(...)

答案 1

您可以使用 保持顺序,但执行自定义行为compose(Func1)

source
.compose(o -> condition ? o.map(v -> v + 1) : o.map(v -> v * v))
.filter(...)
.subscribe(...)

答案 2

如果您希望发出单个值,或者希望使用另一个 Observable 发出多个值,也可以使用运算符 with。filterdefaultIfEmptyswitchIfEmpty

val item = Observable.just("ABC")

item.filter { s -> s.startsWith("Z") }
    .defaultIfEmpty("None")
    .subscribe { println(it) }

推荐