Kotlin学习系列之:协程的取消和超时

x33g5p2x  于2022-03-08 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(549)

​ 通过前面的三篇文章,我们已经讨论了协程的创建。有的时候,我们在启动了一个协程之后,并不需要该协程执行完毕,这个时候我们可以取消该协程的执行。比如在Android开发中,我们打开了一个页面,我们在进入页面的时候启动了一个协程来发起了网络请求,但是用户立马就关闭了页面,这个时候我们就可以取消这个协程的执行,因为我们已经不需要它的执行结果了。

  1. 我们先来回顾一下CoroutineScope.launch{}的方法签名:
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit): Job

可以看到,它有一个Job类型的返回值,它有对应的cancel()方法来取消执行的协程:

fun main() = runBlocking {

    val job = launch {
        repeat(200) {
            println("hello : $it")
            delay(500)
        }
    }

    delay(1100)
    println("world")

    job.cancel()
    job.join()

    println("welcome")
}

运行结果为:

hello : 0
hello : 1
hello : 2
world
welcome

在delay 1100毫秒之后,由于在runBlocking协程(姑且称之)中调用了job.cancel()之后,launch协程(姑且称之)中原本会repeat 200次的执行,如今只计数到了2,说明的的确确被取消了。cancel()一般会和join()方法一起使用,因为cancel可能不会立马取消对应的协程(下面我们会提到,协程能够被取消,是需要一定条件的),所以会需要join()来协调两个协程。故而有个更加简便的方法:Job.cancelAndJoin(),可以用来替换上面的两行代码。

public suspend fun Job.cancelAndJoin() {
    cancel()
    return join()
}
  • 协程能够被取消的前提条件

只有协程代码是可取消的,cancel()才能起作用。
Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable.

这是官方的描述。我们来直接看一段代码:

fun main() = runBlocking {
    
    val job = launch(context = Dispatchers.Default) {

        println("Current Thread : ${Thread.currentThread()}")
        var nextActionTime = System.currentTimeMillis()
        var i = 0

        while (i < 20) {
            if (System.currentTimeMillis() >= nextActionTime) {
                println("Job: ${i++}")
                nextActionTime += 500
            }
        }
    }

    delay(1300)
    println("hello")

    job.cancelAndJoin()

    println("welcome")
}

这段代码我们要注意两点:

  • 调用launch方法时,我们给其形参context多传递了一个Dispatcheres.Default参数。在这里我只告诉大家,这样使得launch启动的协程代码运行在一个新的子线程中,而不是和runBlocking协程一样(它是运行在主线程中)。(下一篇我们再来详细阐述这个地方)
  • 理解一下launch协程中的循环计算代码:
    第一次循环:i=0,同时 if条件肯定满足,输出”Job:0“,nextActionTime += 500

第二次循环:由于nextActionTime在第一次循环中加了500,而且if中两行代码的执行时间肯定远远 不足500毫秒

第…次循环:…

直到等足了500毫秒,才第二次进入if条件,使用i++,nextActionTime += 500

最终当i=20时,循环条件不满足,退出循环,至此launch协程代码执行完毕。

在空等500毫秒中,实际上可以看做是死循环了500毫秒,并且一直占用着cpu。

我们来看运行结果:

按照我们本来的认知,在delay 1300毫秒之后,由于我们调用了cancelAndJoin方法,应该会取消launch子协程的运行才对(换句话说i最大值为2,而不会加到20才退出)。也就是说,取消没有成功。现在,我们再回过头来,理解”只有协程代码是可取消的,cancel()才能起作用“。那也就是说,这个示例中的launch协程的代码是不可取消的。那么什么样的代码才可以视为可取消的呢

  • kotlinx.coroutines包下的所有挂起函数都是可取消的。这些挂起函数会检查协程的取消状态,当取消时就会抛出CancellationException异常
  • 如果协程正在处于某个计算过程当中,并且没有检查取消状态,那么它就是无法被取消的

很显然,我们上面示例中的代码就是计算过程中,所以它是无法被取消的。那么有没有什么方式使得这样的计算代码也变为可取消的呢?

  • 可以周期性地调用一个挂起函数,因为该挂起函数会取检查取消状态。
  • 显式地去检查取消状态

下面我们就对刚刚的代码做一下改进:

fun main() = runBlocking {
    val job = launch(Dispatchers.Default) {

        println("Current Thread : ${Thread.currentThread()}")
        var nextActionTime = System.currentTimeMillis()
        var i = 0

        while (isActive) {
            if (System.currentTimeMillis() >= nextActionTime) {
                println("Job: ${i++}")
                nextActionTime += 500
            }
        }
    }

    delay(1300)
    println("hello")

    job.cancelAndJoin()

    println("welcome")
}

输出结果:

Current Thread : Thread[DefaultDispatcher-worker-1,5,main]
Job: 0
Job: 1
Job: 2
hello
welcome

这样我们就能成功的取消了计算过程中的协程。

最后,我们对协程取消条件做一下总结:从某种角度上讲,是否能够取消是主动的;外部调用了cancel方法后,相当于是发起了一条取消信号;被取消协程内部如果自身检测到自身状态的变化,比如isActive的判断以及所有的kotlinx.coroutines包下挂起函数,都会检测协程自身的状态变化,如果检测到通知被取消,就会抛出一个CancellationException的异常。

  • 下面看一波这样的示例代码:
fun main() = runBlocking {
    val job = launch {
        try {
            repeat(200) {
                println("job: I am sleeping $it")
                delay(500)
            }
        }catch (e: CancellationException){
            println("canceled")
        }
        finally {
            println("finally块")
        }
    }

    delay(1300)
    println("hello")

    job.cancelAndJoin()
    println("welcome")

}

job: I am sleeping 0
job: I am sleeping 1
job: I am sleeping 2
hello
canceled
finally块
welcome

这块可以说明两个问题:

  • 就是前面提到的CancellationException
  • 我们可以在finally代码块中对于一些资源的关闭和回收
  • 现在有一个问题:对于大多数资源的关闭和回收(比如关闭文件、取消job等),都是瞬间的动作,都不会是阻塞的行为。可能在极少数情况下,关闭和回收的操作是阻塞的,是需要调用挂起函数的,但是在finally中,如果协程已经被取消,那么此时对于挂起函数的调用,都会抛出一个CancellationException的异常。那么这种情况下,我们又该如何去处理:
fun main() = runBlocking {
    val job = launch {
        try {
            repeat(200) {
                println("job: I am sleeping $it")
                delay(500)
            }
        } finally {
            withContext(NonCancellable){
                println("finally块")
                delay(1000)
                println("after delay in finally block.")
            }
        }
    }

    delay(1300)
    println("hello")

    job.cancelAndJoin()
    println("welcome")

}
/*
Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns the result.
*/
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T
public object NonCancellable : AbstractCoroutineContextElement(Job), Job {
  .....
}
  • withContext: 在给定的协程上下文下调用指定的挂起代码块,会一直挂起,直到结果返回,后面在介绍协程在Android开发的应用时,会时常看到它的身影。
  • NonCancellable:它是一个object对象,并且它是不会被取消的,它的状态一直是active的。
A non-cancelable job that is always [active][Job.isActive]. It is designed for [withContext] function
* to prevent cancellation of code blocks that need to be executed without cancellation.
  • CancellationException。既然当协程处于取消状态时,对于挂起函数的调用,会导致该异常的抛出,那么我们为什么没有在输出终端见到它的身影呢?因为kotlin的协程是这样规定的:
    That is because inside a cancelled coroutine CancellationException is considered to be a normal reason for coroutine completion.

也就是说,CancellationException这个异常是被视为正常现象的取消。

  • 父子协程的取消。

前面我们已经讨论了协程的取消自身的种种,那么如果父协程取消,对子协程有什么影响呢?同样地,子协程的取消,会对父协程有什么影响呢?

/* Jobs can be arranged into parent-child hierarchies where cancellation
* of a parent leads to immediate cancellation of all its [children]. Failure or cancellation of a child
* with an exception other than [CancellationException] immediately cancels its parent. This way, a parent
* can [cancel] its own children (including all their children recursively) without cancelling itself.
*
*/

这一段是Job这个接口的文档注释,我截取了一部分出来。我们一起来看下这段文档说明:

Job可以被组织在父子层次结构下,当父协程被取消后,会导致它的子协程立即被取消。一个子协程失败或取消的异常(除了CancellationException),它也会立即导致父协程的取消。

下面我们就通过代码来证明这一点:

a. 父协程取消对于子协程的影响:

fun main() = runBlocking {

    val parentJob = launch {
        launch {
            println("child Job: before delay")
            delay(2000)
            println("child Job: after delay")
        }

        println("parent Job: before delay")
        delay(1000)
        println("parent Job: after delay")
    }

    delay(500)
    println("hello")

}

这是没调用cancel的代码,输出结果如下:

parent Job: before delay
child Job: before delay
hello
parent Job: after delay
child Job: after delay

做一下变动:

fun main() = runBlocking {

    val parentJob = launch {
        launch {
            println("child Job: before delay")
            delay(2000)
            println("child Job: after delay")
        }

        println("parent Job: before delay")
        delay(1000)
        println("parent Job: after delay")
    }

    delay(500)
    parentJob.cancelAndJoin()
    println("hello")

}

我们在delay(500)之后添加一行:parentJob.cancelAndJoin(),再看输出结果:

parent Job: before delay
child Job: before delay
hello

可以看到,我们一旦取消父协程对应的Job之后,子协程的执行也被取消了,那么也就验证父协程的取消对于子协程的影响。

b. 子协程正常的CancellationException取消:

fun main() = runBlocking {

    val parentJob = launch {
        val childJob = launch {
            println("child Job: before delay")
            delay(2000)
            println("child Job: after delay")

        }

        println("parent Job: before delay")
        delay(1000)
        childJob.cancelAndJoin()
        println("parent Job: after delay")
    }

    delay(500)
    println("hello")

}

输出结果为:

parent Job: before delay
child Job: before delay
hello
parent Job: after delay

可以看到,如果子协程是正常的取消(即CancellationException),那么对于父协程是没有影响的。

c. 子协程的非CancellationException取消

fun main() = runBlocking {

    val parentJob = launch {
        val childJob = launch {
            println("child Job: before delay")
            delay(800)
            throw RuntimeException("cause to cancel child job")
        }

        println("parent Job: before delay")
        delay(1000)
        childJob.cancelAndJoin()
        println("parent Job: after delay")
    }

    delay(500)
    println("hello")

}

输出结果:

parent Job: before delay
child Job: before delay
hello
Exception in thread “main” java.lang.RuntimeException: cause to cancel child job

这样非CancellationException导致的子协程地取消,也会导致父协程的取消。

  • 提问:A协程有两个子协程B、C,如果B由于非CancellationException导致被取消,那么C会受到影响吗?

这个也不难得出答案,B的非CancellationException导致的取消,自然会导致父协程A被取消,那么C作为A的子协程也会被取消。

  • 说明:以上的讨论是返回Job的协程且不考虑SupervisorJob的存在,后面还会学习到返回Deferred的协程以及SupervisorJob(它和我们在Android开发中使用协程息息相关)。
  • 协程的超时取消。

如果用于执行某个任务的协程,我们设定,如果它超过某个时间后,还未完成,那么我们就需要取消该协程。我们可以使用withTimeout轻松实现这一功能:

fun main() = runBlocking {

   val result =  withTimeout(1900) {
        repeat(3) {
            println("hello: $it")
            delay(400)
        }
      "hello world"
    }
		println(result)
}

这种情况下没有超时,输出结果为:

hello: 0
hello: 1
hello: 2

“hello world”

我们修改一下超时时间为1100,这时的输出结果为:

hello: 0
hello: 1
hello: 2
Exception in thread “main” kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1100 ms

这样就把超时转换成了普通的异常,我们可以对异常进行捕获:

fun main() = runBlocking {

    try {
       val result =   withTimeout(1100) {
            repeat(3) {
                println("hello: $it")
                delay(400)
            }
            "hello world"
        }
        println(result)
    } catch (e: TimeoutCancellationException) {
        println("超时取消")
    }
}

hello: 0
hello: 1
hello: 2
超时取消

与之类似地还有withTimeoutOrNull:

fun main() = runBlocking {

    val result = withTimeoutOrNull(1900) {
        repeat(3) {
            println("hello: $it")
            delay(400)
        }

        "hello world"
    }

    println("the result is : $result")
}

输出结果为:

hello: 0
hello: 1
hello: 2
the result is : hello world

再次修改超时时间:

fun main() = runBlocking {

    val result = withTimeoutOrNull(1100) {
        repeat(3) {
            println("hello: $it")
            delay(400)
        }

        "hello world"
    }

    println("the result is : $result")
}

运行结果如下:

hello: 0
hello: 1
hello: 2
the result is : null

可以看到,withTimeoutOrNull与withTimeout的区别在于,当发生超时取消后,withTimeoutOrNull的返回为null,而withTimeout会抛出一个TimeoutCancellationException。

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? {
    if (timeMillis <= 0L) return null

    var coroutine: TimeoutCoroutine<T?, T?>? = null
    try {
        return suspendCoroutineUninterceptedOrReturn { uCont ->
            val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
            coroutine = timeoutCoroutine
            setupTimeout<T?, T?>(timeoutCoroutine, block)
        }
    } catch (e: TimeoutCancellationException) {
        // Return null if it's our exception, otherwise propagate it upstream (e.g. in case of nested withTimeouts)
        if (e.coroutine === coroutine) {
            return null
        }
        throw e
    }
}

之所以有这样的区别,我们可以从withTimeoutOrNul的源码中得出答案:它对TimeoutCancellationException进行了捕获。

相关文章