Kotlin 协程取消与超时 #

一、协程取消 #

1.1 基本取消 #

kotlin
fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("Working $i")
            delay(500)
        }
    }
    
    delay(1300)
    println("Cancelling...")
    job.cancel()
    job.join()
    println("Cancelled")
}

1.2 cancelAndJoin #

kotlin
fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("Working $i")
            delay(500)
        }
    }
    
    delay(1300)
    job.cancelAndJoin()  // 取消并等待
    println("Done")
}

1.3 检查取消状态 #

kotlin
fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            if (!isActive) {
                println("Cancelled at $i")
                return@launch
            }
            println("Working $i")
            Thread.sleep(500)  // 阻塞操作,不会自动取消
        }
    }
    
    delay(1300)
    job.cancelAndJoin()
}

1.4 ensureActive #

kotlin
fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            ensureActive()  // 检查并抛出 CancellationException
            println("Working $i")
            Thread.sleep(500)
        }
    }
    
    delay(1300)
    job.cancelAndJoin()
}

1.5 yield #

kotlin
fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            yield()  // 检查取消并让出执行权
            println("Working $i")
        }
    }
    
    delay(10)
    job.cancelAndJoin()
}

二、可取消的挂起函数 #

2.1 delay #

kotlin
fun main() = runBlocking {
    val job = launch {
        repeat(10) {
            delay(500)  // 可取消
            println("Working $it")
        }
    }
    
    delay(1200)
    job.cancel()
}

2.2 自定义可取消函数 #

kotlin
suspend fun cancellableWork() {
    while (isActive) {
        // 执行工作
        delay(100)
    }
}

2.3 tryFinally 清理 #

kotlin
fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("Working $i")
                delay(500)
            }
        } finally {
            println("Cleanup...")
        }
    }
    
    delay(1300)
    job.cancelAndJoin()
    println("Done")
}

2.4 不可取消的清理 #

kotlin
fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("Working $i")
                delay(500)
            }
        } finally {
            withContext(NonCancellable) {
                println("Cleanup (non-cancellable)")
                delay(1000)  // 可以使用 delay
                println("Cleanup done")
            }
        }
    }
    
    delay(1300)
    job.cancelAndJoin()
}

三、超时处理 #

3.1 withTimeout #

kotlin
fun main() = runBlocking {
    try {
        withTimeout(1000) {
            repeat(10) {
                delay(300)
                println("Working $it")
            }
        }
    } catch (e: TimeoutCancellationException) {
        println("Timed out!")
    }
}

3.2 withTimeoutOrNull #

kotlin
fun main() = runBlocking {
    val result = withTimeoutOrNull(1000) {
        repeat(10) {
            delay(300)
            println("Working $it")
        }
        "Completed"
    }
    
    println("Result: $result")  // null if timed out
}

3.3 超时重试 #

kotlin
suspend fun <T> withTimeoutRetry(
    time: Long,
    retries: Int,
    block: suspend () -> T
): T {
    repeat(retries) { attempt ->
        try {
            return withTimeout(time) { block() }
        } catch (e: TimeoutCancellationException) {
            if (attempt == retries - 1) throw e
            println("Retry $attempt")
        }
    }
    throw TimeoutCancellationException("All retries failed")
}

四、异常处理 #

4.1 try-catch #

kotlin
fun main() = runBlocking {
    val job = launch {
        try {
            throw Exception("Error!")
        } catch (e: Exception) {
            println("Caught: $e")
        }
    }
    job.join()
}

4.2 CoroutineExceptionHandler #

kotlin
fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught: $exception")
    }
    
    launch(handler) {
        throw Exception("Error!")
    }
}

4.3 supervisorJob #

kotlin
fun main() = runBlocking {
    val supervisor = SupervisorJob()
    
    with(CoroutineScope(coroutineContext + supervisor)) {
        launch {
            throw Exception("Child 1 failed")
        }
        
        launch {
            delay(100)
            println("Child 2 completed")  // 仍然执行
        }
    }
}

4.4 supervisorScope #

kotlin
fun main() = runBlocking {
    supervisorScope {
        launch {
            throw Exception("Child 1 failed")
        }
        
        launch {
            delay(100)
            println("Child 2 completed")
        }
    }
}

五、取消传播 #

5.1 父取消影响子 #

kotlin
fun main() = runBlocking {
    val parent = launch {
        launch {
            delay(1000)
            println("Child 1")
        }
        
        launch {
            delay(2000)
            println("Child 2")
        }
    }
    
    delay(500)
    parent.cancel()  // 所有子协程被取消
}

5.2 子失败影响父 #

kotlin
fun main() = runBlocking {
    try {
        coroutineScope {
            launch {
                delay(100)
                throw Exception("Child failed")
            }
            
            launch {
                delay(200)
                println("This won't run")
            }
        }
    } catch (e: Exception) {
        println("Caught: $e")
    }
}

5.3 独立失败 #

kotlin
fun main() = runBlocking {
    supervisorScope {
        launch {
            delay(100)
            throw Exception("Child 1 failed")
        }
        
        launch {
            delay(200)
            println("Child 2 completed")  // 会执行
        }
    }
}

六、实战示例 #

6.1 可取消的网络请求 #

kotlin
class ApiClient {
    suspend fun fetchWithTimeout(url: String): String? {
        return withTimeoutOrNull(5000) {
            withContext(Dispatchers.IO) {
                // 网络请求
                URL(url).readText()
            }
        }
    }
}

6.2 资源清理 #

kotlin
suspend fun processFile(path: String) {
    val file = File(path)
    val input = file.inputStream()
    
    try {
        withContext(Dispatchers.IO) {
            // 处理文件
        }
    } finally {
        withContext(NonCancellable) {
            input.close()
        }
    }
}

6.3 超时重试策略 #

kotlin
suspend fun <T> retryWithTimeout(
    times: Int,
    timeout: Long,
    block: suspend () -> T
): T {
    var lastException: Exception? = null
    
    repeat(times) {
        try {
            return withTimeout(timeout) { block() }
        } catch (e: Exception) {
            lastException = e
        }
    }
    
    throw lastException ?: Exception("Unknown error")
}

七、最佳实践 #

7.1 使用 withTimeoutOrNull #

kotlin
// 推荐
val result = withTimeoutOrNull(1000) {
    fetchData()
} ?: "Default"

// 不推荐
try {
    withTimeout(1000) { fetchData() }
} catch (e: TimeoutCancellationException) {
    "Default"
}

7.2 正确清理资源 #

kotlin
suspend fun work() {
    val resource = acquireResource()
    try {
        doWork(resource)
    } finally {
        withContext(NonCancellable) {
            releaseResource(resource)
        }
    }
}

7.3 使用 supervisorScope 隔离失败 #

kotlin
suspend fun parallelTasks() = supervisorScope {
    val results = mutableListOf<Result<*>>()
    
    listOf("task1", "task2", "task3").forEach { task ->
        launch {
            try {
                results.add(Result.success(execute(task)))
            } catch (e: Exception) {
                results.add(Result.failure(e))
            }
        }
    }
}

八、总结 #

取消与超时要点:

操作 方法
取消 cancel(), cancelAndJoin()
检查 isActive, ensureActive()
超时 withTimeout, withTimeoutOrNull
清理 finally, NonCancellable
异常 CoroutineExceptionHandler

下一步,让我们学习 协程通道

最后更新:2026-03-27