RxJava 调度程序的用例

在 RxJava 中,有 5 种不同的调度程序可供选择:

  1. instant():创建并返回一个调度程序,该调度程序立即在当前线程上执行工作。

  2. trampoline():创建并返回一个调度程序,该调度程序将当前线程上的工作排队,以便在当前工作完成后执行。

  3. newThread():创建并返回一个调度程序,该调度程序为每个工作单元创建一个新的线程。

  4. computing():创建并返回用于计算工作的调度程序。这可用于事件循环、处理回调和其他计算工作。不要在此计划程序上执行 IO 绑定的工作。使用计划程序。io() 代替。

  5. io():创建并返回用于 IO 绑定工作的计划程序。该实现由执行器线程池提供支持,该线程池将根据需要增长。这可用于异步执行阻塞 IO。不要在此调度程序上执行计算工作。使用计划程序。计算() 代替。

问题:

前3个调度程序非常不言自明;但是,我对计算io有点困惑。

  1. 究竟什么是“IO绑定工作”?它是否用于处理流 () 和文件 ()?它是否用于数据库查询?它是否用于下载文件或访问 REST API?java.iojava.nio.files
  2. computing() 与 newThread() 有何不同?是不是所有计算()调用都位于单个(后台)线程上,而不是每次都位于新的(后台)线程上?
  3. 为什么在执行 IO 工作时调用 computing() 不好?
  4. 为什么在做计算工作时调用io()不好?

答案 1

好问题,我认为文档可以做一些更详细的事情。

  1. io()由一个无限的线程池支持,并且是你用于非计算密集型任务的那种东西,即不会给CPU带来太多负载的东西。因此,与文件系统的交互,与不同主机上的数据库或服务的交互就是很好的例子。
  2. computation()由大小等于可用处理器数的有界线程池支持。如果您尝试在超过可用处理器(例如使用)的处理器上并行安排CPU密集型工作,那么您将面临线程创建开销和上下文切换开销,因为线程争夺处理器,这可能是一个很大的性能打击。newThread()
  3. 最好只留给 CPU 密集型工作,否则您将无法获得良好的 CPU 利用率。computation()
  4. 出于2中讨论的原因,要求计算工作是不好的。 是无限的,如果您并行安排一千个计算任务,那么这一千个任务中的每一个都将有自己的线程,并争夺CPU,从而产生上下文切换成本。io()io()io()

答案 2

最重要的一点是,Schedulers.ioSchedulers.computing 都由无限线程池支持,而不是问题中提到的其他线程池。此特征仅由 Schedulers.from(Executor) 共享,以防使用 newCachedThreadPool(使用自动回收线程池不受限制)创建执行器。

正如在之前的回复和网络上的多篇文章中充分解释的那样,Schedulers.ioDispenders.computing应该谨慎使用,因为它们针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的角色是为响应流提供真正的并发性

与新手的看法相反,反应流本质上不是并发的,而是固有的异步和顺序的。正是出于这个原因,Schedulers.io 只能在I / O操作被阻塞时使用(例如:使用阻塞命令,如Apache IOUtils FileUtils FileUtils.readFileAsString(...)),因此会冻结调用线程,直到操作完成。

使用异步方法(如 Java AsynchronousFileChannel(...))不会在操作期间阻塞调用线程,因此使用单独的线程没有意义。实际上,Schedulers.io 线程并不适合异步操作,因为它们不运行事件循环,并且回调永远不会...名叫。

相同的逻辑适用于数据库访问或远程 API 调用。如果可以使用异步或反应式 API 进行调用,请不要使用 Schedulers.io

回到并发。您可能无权访问异步或反应式 API 来异步或并发执行 I/O 操作,因此唯一的替代方法是在单独的线程上调度多个调用。唉,反应式流在其末端是连续的,但好消息是flatMap()运算符可以在其核心引入并发性

必须在流构造中构建并发性,通常使用 flatMap() 运算符。这个功能强大的运算符可以配置为在内部为flatMap()嵌入式函数<T,R>提供多线程上下文。该上下文由多线程计划程序(如 Scheduler.ioScheduler.computing)提供。

在有关 RxJava2 计划程序并发的文章中查找更多详细信息,您可以在其中找到有关如何按顺序和并发使用计划程序的代码示例和详细说明。

希望这有帮助,

Softjake