获取可观察量的最新值并立即发出

我试图获取给定值的最新值,并在调用后立即将其发出。以下面的代码为例:Observable

return Observable.just(myObservable.last())
    .flatMap(myObservable1 -> {
        return myObservable1;
    })
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object

这不起作用,因为通过这样做,将发出,而这些意志又必须发出才能到达.我不知道做这样的事情是否有可能。有没有人知道如何实现这一目标?谢谢flatMapmyObservable1map


答案 1

last()方法在这里没有任何帮助,因为它等待可观察量终止,为您提供发出的最后一项。

假设您无法控制发射可观察量,您可以简单地创建一个并将其订阅到发出要侦听的数据的可观察量,然后订阅创建的主体。因为是两者兼而有之,你会得到你想要的。BehaviorSubjectSubjectObservableSubscriber

我认为(现在没有时间检查它)您可能必须手动取消订阅原始可观察量,因为一旦他的所有订阅者取消订阅,就不会自动取消订阅。BehaviorSubject

像这样:

BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any.
    // You can also always supply the first item to the behavior subject
});

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html


答案 2

在 RxJava 中,subscriber.onXXX 称为异步。这意味着,如果您的 Observable 在新线程中发出项目,则永远无法在返回之前获取最后一个项目,除非您阻止线程并等待该项目。但是,如果 Observable 同步发出项,而您没有通过 subscribeOn 和 observOn 更改它的线程,例如代码:

Observable.just(1,2,3).subscribe();

在这种情况下,您可以通过执行以下操作来获取最后一项:

Integer getLast(Observable<Integer> o){
    final int[] ret = new int[1];
    Observable.last().subscribe(i -> ret[0] = i);
    return ret[0];
}

这样做是个坏主意。RxJava更喜欢你通过它来做异步工作。


推荐