Kotlin学习系列之:协程上下文与分发器

x33g5p2x  于2022-03-08 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(401)

我们在协程的第一篇就已经提过,协程的运行是依赖于线程的。那么协程与线程之间的关系到底是怎样的呢?

  1. 协程上下文(Coroutine Context):多种元素的集合,包括Job、分发器等。协程总是会在某个上下文中执行的,这个上下文是由CoroutineContext类型的一个实例来决定的。
  2. 协程分发器(Dispatcher):决定协程运行在哪个线程或者线程池上,对应的类就是CoroutineDispatcher。

再看launch{}和aysnc{}:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

会存在一个context参数,我们目前还没有介绍过,我们可以通过这个参数,来指定使用哪个协程分发器,从而决定协程运行在哪个线程或者线程池上。

  • CoroutineDispatcher类。
/**
 * Base class that shall be extended by all coroutine dispatcher implementations.
 *
 * The following standard implementations are provided by `kotlinx.coroutines` as properties on
 * [Dispatchers] objects:
 *
 * * [Dispatchers.Default] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
 *   is specified in their context. It uses a common pool of shared background threads.
 *   This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
 * * [Dispatchers.IO] -- uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
 *   operations (like file I/O and blocking socket I/O).
 * * [Dispatchers.Unconfined] -- starts coroutine execution in the current call-frame until the first suspension.
 *   On first  suspension the coroutine builder function returns.
 *   The coroutine resumes in whatever thread that is used by the
 *   corresponding suspending function, without confining it to any specific thread or pool.
 *   **Unconfined dispatcher should not be normally used in code**.
 * * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
 * * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to dispatcher with [asCoroutineDispatcher] extension function.
 *
 * This class ensures that debugging facilities in [newCoroutineContext] function work properly.
 */

这是CoroutineDispatcher类的文档说明:CoroutineDispatcher类是所有协程分发器实现的基类。它有如下标准实现(作为Dispatchers类的属性存在):

  • Dispatchers.Default:被用于所有没有指定分发器或者ContinuationInterceptor的标准协程构建器,简而言之,就是默认的分发器。它会使用一个共享的后台线程池。
  • Dispatchers.IO:也会使用一个共享线程池,主要用于IO操作。
  • Dispatchers.MAIN:主线程分发器,即启动的协程会在主线程中运行
  • Dispatchers.Unconfined:不指定某个特定的线程或者线程池来运行协程
  • 自定义的私有线程池

下面我们就通过一个例子来说明这些协程分发器的作用:

fun main() = runBlocking<Unit> {

    launch {
        println("No params, thread : ${Thread.currentThread().name}")
    }

    launch(Dispatchers.Unconfined) {
        println("Unconfined, thread: ${Thread.currentThread().name}")
    }

    launch(Dispatchers.Default) {
        println("Default, thread: ${Thread.currentThread().name}")
    }

    GlobalScope.launch {
        println("GlobalScope, thread: ${Thread.currentThread().name}")
    }
}

先看运行结果:

Unconfined, thread: main
Default, thread: DefaultDispatcher-worker-1
GlobalScope, thread: DefaultDispatcher-worker-3
No params, thread : main

可能每个人的输出结果的顺序都不太一样,这是正常的,但是每行的结果是一样的。

  • 不指定参数:main线程
  • 使用Dispathcers.Unconfined:此时显示的是在main线程,但是它的运行机制没有这么简单,我们在下面详述
  • 使用Dispathcers.Default:DefaultDispatcher-worker-1,很明显,就不在主线程了,这个就是我们所说的后台的共享线程池
  • GlobalScope.launch{}:会发现它也不在主线程,并且和Dispathcers.Default是同一个线程池。
  • 如果我们不显式指定,也就是不带参数的启动,那么它一定会运行在主线程吗?我们给出的答案是:否。我们不妨在launch(Dispatchers.Default){}中再添加几行代码:
launch(Dispatchers.Default) {
    println("Default, thread: ${Thread.currentThread().name}")
    launch {
        println("In Default, no params, thread: ${Thread.currentThread().name}")
    }
}

现在的运行结果为:

Unconfined, thread: main
Default, thread: DefaultDispatcher-worker-1
In Default, no params, thread: DefaultDispatcher-worker-3
GlobalScope, thread: DefaultDispatcher-worker-2
No params, thread : main

注意第三行和第五行,同样的不带参数的launch,它所运行的线程是不一样的。

好,我们现在来总结协程的运行线程的判定:如果我们没有显式指定分发器,那么它会考虑从启动它的协程上下文去继承;如果我们显式指定了分发器,那么就使用指定的分发器来运行线程。如这里在Default中由launch{}启动的协程,它就会从外层launch(Dispatchers.Default)继承过来,再考虑外层的launch{}(输出结果为No params, thread : main),它是由runBlocking{}继承过来,由于runBlocking是运行在主线程中,所以它也是运行在主线程中。如果你还疑问runBlocking为什么运行在主线程,我们来看看runBlocking的实现:

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    val currentThread = Thread.currentThread()
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // create or use private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    }
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}

可以看到,它会使用 Thread.currentThread()获取当前它所在的线程,由于我们这里阻塞的是main线程,所以它自然会运行在main线程上。

  • 我们还剩下一个Dispatchers.Unconfined没有讲解:
fun main() = runBlocking<Unit> {

    launch(Dispatchers.Unconfined) {
        println("before delay: thread -> " + Thread.currentThread().name)
        delay(100)
        println("after delay: thread -> " + Thread.currentThread().name)
    }
}

输出结果为:

before delay: thread -> main
after delay: thread -> kotlinx.coroutines.DefaultExecutor

会发现在调用delay方法前后,它所运行的线程是不一样的。那么它的运行机制到底是怎样的呢?使用Dispatchers.Unconfined分发器的协程,它会在运行在启动它的协程上下文中去继承,这里也就是main线程,直到遇到第一个挂起点(也就是这里的delay挂起函数);当它从挂起函数中恢复执行后,它所运行的线程就变成了挂起函数所在的线程。

  • 如果我们想要指定自己创建的线程池来运行协程,那么我们该怎么做?
fun main() = runBlocking<Unit> {

    val executorDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    launch(executorDispatcher) {
        println("Executors, thread: ${Thread.currentThread().name}")
        executorDispatcher.close()
    }
}

Executors, thread: pool-1-thread-1

通过asCoroutineDispatcher()这个扩展方法,我们可以将newSingleThreadExecutor的线程池转换成一个分发器,然后使用这个分发器去启动我们的协程。这里有一个注意点,那就是一定要调用close关闭这个分发器。大家可以尝试注释掉executorDispatcher.close()这行代码,然后运行程序,你会发现,虽然控制台有结果输出,但是我们的程序并没有退出,就是由于我们自己创建的线程池一直在占用着资源。

相关文章