Kotlin 协程如何知道在进行网络调用时何时屈服?

2022-09-03 15:19:56

我是 Kotlin 协程的新手,有一件事我没有弄清楚,协程如何知道在进行网络调用时何时向他人屈服。

如果我理解正确,协程是先发制人的,这意味着当它有一些耗时的任务(通常是I / O操作)要执行时,它知道何时屈服于其他协程。

例如,假设我们想要绘制一些将显示来自远程服务器的数据的 UI,并且我们只有一个线程来调度协程。我们可以启动一个协程来调用 REST API 来获取数据,同时让另一个协程绘制 UI 的其余部分,这些协程对数据没有依赖性。但是,由于我们只有一个线程,因此一次只能运行一个协程。除非用于获取数据的协程在等待数据到达时抢占式结果,否则两个协程将按顺序执行。

据我所知,Kotlin 的协程实现不会修补任何现有的 JVM 实现或 JDK 网络库。因此,如果协程调用 REST API,它应该像使用 Java 线程完成的那样进行阻塞。我之所以这么说,是因为我在python中似乎有类似的概念,这些概念被称为绿色线程。为了让它与python的内置网络库一起使用,必须首先“猴子修补”网络库。对我来说,这是有道理的,因为只有网络库本身知道何时屈服。

那么,谁能解释一下 Kotlin 协程在调用阻塞 Java 网络 API 时如何知道何时屈服呢?或者,如果没有,那么这是否意味着上面示例中提到的任务不能同时执行,给出单个线程?

谢谢!


答案 1

协程先发制人地工作

不。使用协程,您只能实现协作式多线程处理,其中使用显式方法调用挂起和恢复协程。协程仅列出按需挂起和恢复的问题,而协程调度程序负责确保它在适当的线程上启动和恢复。

研究此代码将帮助您了解 Kotlin 协程的本质:

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*

fun main(args: Array<String>) {
    var continuation: Continuation<Unit>? = null
    println("main(): launch")
    GlobalScope.launch(Dispatchers.Unconfined) {
        println("Coroutine: started")
        suspendCoroutine<Unit> {
            println("Coroutine: suspended")
            continuation = it
        }
        println("Coroutine: resumed")
    }
    println("main(): resume continuation")
    continuation!!.resume(Unit)
    println("main(): back after resume")
}

在这里,我们使用最琐碎的调度程序,它不执行任何调度,它在你调用和的地方运行协程。协程通过调用 来挂起自身。此函数通过向它传递稍后可用于恢复协程的对象来运行您提供的块。我们的代码将其保存到 .控件返回到 之后的代码,其中我们使用延续对象来恢复协程。Unconfinedlaunch { ... }continuation.resume()suspendCoroutinevar continuationlaunch

整个程序在主线程上执行并打印以下内容:

main(): launch
Coroutine: started
Coroutine: suspended
main(): resume continuation
Coroutine: resumed
main(): back after resume

我们可以启动一个协程来调用 REST API 来获取数据,同时让另一个协程绘制 UI 的其余部分,这些协程对数据没有依赖性。

这实际上描述了您将使用纯线程执行的操作。协程的优点是,您可以在 GUI 绑定代码的中间进行“阻塞”调用,并且不会冻结 GUI。在您的示例中,您将编写一个用于进行网络调用然后更新 GUI 的协程。当网络请求正在进行时,协程将挂起,其他事件处理程序将运行,从而使 GUI 保持活动状态。处理程序不是协程,它们只是常规的 GUI 回调。

用最简单的术语来说,你可以写这个Android代码:

activity.launch(Dispatchers.Main) {
    textView.text = requestStringFromNetwork()
}

...

suspend fun requestStringFromNetwork() = suspendCancellableCoroutine<String> {
    ...
}

requestStringFromNetwork相当于“修补IO层”,但你实际上并没有修补任何东西,你只是围绕IO库的公共API编写包装器。几乎所有的 Kotlin IO 库都在添加这些包装器,并且还有用于 Java IO 库的扩展库。如果您按照这些说明进行操作,编写自己的也非常简单。


答案 2

答案是:协程不知道网络调用或 I/O 操作。您必须根据自己的需求编写代码,将繁重的工作包含在不同的协程中,以便可以同时执行它们,因为默认行为是按顺序执行的。

例如:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here (maybe I/O)
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here (maybe I/O), too
    return 29
}

fun main(args: Array<String>) = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = doSomethingUsefulOne()
            val two = doSomethingUsefulTwo()
            println("The answer is ${one + two}")
        }
    println("Completed in $time ms")
}

将产生类似如下的结果:

The answer is 42
Completed in 2017 ms

和 doSomethingUsefulOne() 和 doSomethingUsefulTwo() 将按顺序执行。如果要并发执行,则必须改为编写:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

这将产生:

The answer is 42
Completed in 1017 ms

as doSomethingUsefulOne() 和 doSomethingUsefulTwo() 将同时执行。

资料来源:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#composing-suspending-functions

更新:关于协程的执行位置,我们可以在github项目指南中阅读 https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#thread-local-data

有时,能够传递一些线程本地数据是很方便的,但是,对于不绑定到任何特定线程的协程,如果不编写大量样板,就很难手动实现它。

对于ThreadLocal,asContextElement扩展函数在这里用于救援。它创建一个附加的上下文元素,该元素保留给定 ThreadLocal 的值,并在每次协程切换其上下文时还原它。

在实际操作中很容易证明它:

val threadLocal = ThreadLocal<String?>() // declare thread-local variable
fun main(args: Array<String>) = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, threadlocal value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}

在此示例中,我们使用 Dispatchers.Default 在后台线程池中启动新的协程,因此它适用于与线程池不同的线程,但它仍然具有线程局部变量的值,我们使用 threadLocal.asContextElement(value = “launch”) 指定了该值,无论协程在哪个线程上执行。因此,输出(带调试)为:

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[CommonPool-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[CommonPool-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

推荐