RxJava 在调用/订阅线程时观察
我很难理解subloadOn/observeOn在RxJava中是如何工作的。我创建了带有可观测的简单应用程序,该应用程序会发出太阳系行星名称,进行一些映射和过滤并打印结果。
据我所知,将工作调度到后台线程是通过运算符完成的(它似乎工作正常)。subscribeOn
在后台线程上观察也适用于运算符。observeOn
但是我很难理解,如何观察调用线程(如果它是主线程或任何其他线程)。它很容易在Android上使用运算符完成,但我不知道如何在纯java中实现这一点。AndroidSchedulers.mainThread()
这是我的代码:
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
System.out.println("Main thread: " + getCurrentThreadInfo());
Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
.map(in -> {
System.out.println("map on: " + getCurrentThreadInfo());
return in.toUpperCase();
})
.filter(in -> {
System.out.println("filter on: " + getCurrentThreadInfo());
return in.contains("A");
})
.subscribeOn(Schedulers.from(executor));
for (int i = 0; i < 5; i++) {
Thread thread = new Thread("Thread-" + i) {
@Override
public void run() {
stringObservable
.buffer(5)
.subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
}
};
thread.start();
}
}
private static String getCurrentThreadInfo() {
return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
}
}
在 created 中可观察,并且工作在执行器的三个线程之一上订阅。这按预期工作。但是如何在 for 循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?
另外,我发现运行此代码后,它永远不会终止,我不知道为什么?:(