RxJava 在调用/订阅线程时观察

我很难理解subloadOn/observeOn在RxJava中是如何工作的。我创建了带有可观测的简单应用程序,该应用程序会发出太阳系行星名称,进行一些映射和过滤并打印结果。

据我所知,将工作调度到后台线程是通过运算符完成的(它似乎工作正常)。subscribeOn

在后台线程上观察也适用于运算符。observeOn

但是我很难理解,如何观察调用线程(如果它是主线程或任何其他线程)。它很容易在Android上使用运算符完成,但我不知道如何在纯java中实现这一点。AndroidSchedulers.mainThread()

这是我的代码:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread("Thread-" + i) {
                @Override
                public void run() {
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                }
            };
            thread.start();
        }

    }

    private static String getCurrentThreadInfo() {
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    }
}

在 created 中可观察,并且工作在执行器的三个线程之一上订阅。这按预期工作。但是如何在 for 循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?

另外,我发现运行此代码后,它永远不会终止,我不知道为什么?:(


答案 1

为了回答你的问题,让我从头开始,这可以让其他人理解你已经知道的东西。

调度程序

调度程序与 Java 的执行程序扮演着相同的角色。简而言之 - 他们决定执行哪些线程操作。

通常是一个可观察量和运算符在当前线程中执行。有时您可以将调度程序作为参数传递给 Observable 或 operator(例如 Observable.timer())。

此外,RxJava 还提供了 2 个运算符来指定调度程序:

  • subscribeOn - 指定可观察对象将在其上运行的计划程序
  • observeOn - 指定观察者将在其上观察此可观察的调度程序

为了快速理解它们,我使用了一个示例代码:

在所有示例中,我将使用 helper createObservable,它发出可观察运行线程的线程名称:

 public static Observable<String> createObservable(){
        return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
        );
    }

不带调度程序:

createObservable().subscribe(message -> {
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    });
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

订阅:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

订阅和观察:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

ObserveOn:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

答:

AndroidSchedulers.mainThread() 返回一个 sheduler,它将工作委托给与主线程关联的 MessageQueue。
为此,它使用android.os.Looper.getMainLooper()和android.os.Handler。

换句话说,如果要指定特定线程,则必须提供在线程上计划和执行任务的方法。

在它下面,可以使用任何类型的 MQ 来存储任务和逻辑,这些任务和逻辑循环 Quee 并执行任务。

在java中,我们有为此类任务指定的执行器。RxJava可以很容易地从这样的执行器创建调度程序。

下面是一个示例,它显示了如何在主线程上进行观察(不是特别有用,但显示了所有必需的部分)。

public class RunCurrentThread implements Executor {

    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException {
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    }

    private void observerOnMain() {
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> {
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                });
        ;
    }

    public Observable<String> createObservable() {
        return Observable.create((Subscriber<? super String> subscriber) -> {
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                }
        );
    }

    private void runLoop() throws InterruptedException {
        while(!Thread.interrupted()){
            tasks.take().run();
        }
    }

    @Override
    public void execute(Runnable command) {
        tasks.add(command);
    }
}

最后一个问题,为什么你的代码不会终止:

ThreadPoolExecutor 通过 defult 使用非 deamon 线程,因此您的程序在它们存在之前不会结束。应使用 shutdown 方法来关闭线程。


答案 2

下面是针对 RxJava 2 更新的简化示例。这与Marek的答案的概念相同:一个执行器,它将可运行项添加到在调用方线程上使用的BlocklingQueue中。

public class ThreadTest {

    @Test
    public void test() throws InterruptedException {

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() {
                @Override
                public void execute(@NonNull Runnable runnable) {
                    tasks.add(runnable);
                }
            }))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                }
            });
        tasks.take().run();
    }

}

// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main