Kotlin 协程通道 #

一、Channel 概述 #

Channel 是协程之间传递数据的管道,类似于 BlockingQueue,但是非阻塞的。

二、创建 Channel #

2.1 基本创建 #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        channel.send(1)
        channel.send(2)
        channel.close()
    }
    
    println(channel.receive())  // 1
    println(channel.receive())  // 2
}

2.2 Channel 容量 #

kotlin
// 无限容量
val unlimited = Channel<Int>(Channel.UNLIMITED)

// 有限容量
val buffered = Channel<Int>(10)

// 无缓冲(Rendezvous)
val rendezvous = Channel<Int>(Channel.RENDEZVOUS)

// 溢出策略
val conflated = Channel<Int>(Channel.CONFLATED)  // 只保留最新

三、发送与接收 #

3.1 send 和 receive #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        repeat(5) {
            channel.send(it)
        }
        channel.close()
    }
    
    repeat(5) {
        println(channel.receive())
    }
}

3.2 for 循环接收 #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        repeat(5) { channel.send(it) }
        channel.close()
    }
    
    for (value in channel) {
        println(value)
    }
}

3.3 receiveCatching #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        channel.send(1)
        channel.close()
    }
    
    val result = channel.receiveCatching()
    when (result) {
        is ChannelResult.Closed -> println("Channel closed")
        is ChannelResult.Success -> println("Received: ${result.value}")
    }
}

3.4 trySend 和 tryReceive #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>(1)
    
    channel.trySend(1)  // 非阻塞发送
    channel.trySend(2)  // 可能失败
    
    val result = channel.tryReceive()  // 非阻塞接收
    println(result.getOrNull())
}

四、关闭 Channel #

4.1 关闭通道 #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        repeat(5) { channel.send(it) }
        channel.close()  // 关闭通道
    }
    
    for (value in channel) {
        println(value)
    }
    println("Channel is closed: ${channel.isClosedForReceive}")
}

4.2 检查关闭状态 #

kotlin
val channel = Channel<Int>()

channel.isClosedForReceive  // 是否关闭接收
channel.isClosedForSend     // 是否关闭发送

五、生产者消费者 #

5.1 produce #

kotlin
fun CoroutineScope.produceNumbers() = produce {
    repeat(10) {
        send(it)
    }
}

fun main() = runBlocking {
    val numbers = produceNumbers()
    
    for (num in numbers) {
        println(num)
    }
}

5.2 消费者 #

kotlin
fun CoroutineScope.consumeNumbers(channel: ReceiveChannel<Int>) = launch {
    for (num in channel) {
        println("Consuming: $num")
    }
}

fun main() = runBlocking {
    val producer = produceNumbers()
    val consumer = consumeNumbers(producer)
    
    consumer.join()
}

5.3 多消费者 #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    // 生产者
    launch {
        repeat(10) {
            channel.send(it)
        }
        channel.close()
    }
    
    // 多个消费者
    repeat(3) { workerId ->
        launch {
            for (value in channel) {
                println("Worker $workerId: $value")
            }
        }
    }
}

六、Channel 操作 #

6.1 map #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        repeat(5) { channel.send(it) }
        channel.close()
    }
    
    val mapped = channel.map { it * 2 }
    
    for (value in mapped) {
        println(value)
    }
}

6.2 filter #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        repeat(10) { channel.send(it) }
        channel.close()
    }
    
    val filtered = channel.filter { it % 2 == 0 }
    
    for (value in filtered) {
        println(value)
    }
}

6.3 toList #

kotlin
fun main() = runBlocking {
    val channel = Channel<Int>()
    
    launch {
        repeat(5) { channel.send(it) }
        channel.close()
    }
    
    val list = channel.toList()
    println(list)
}

七、Select #

7.1 选择第一个结果 #

kotlin
fun main() = runBlocking {
    val channel1 = Channel<Int>()
    val channel2 = Channel<Int>()
    
    launch {
        delay(100)
        channel1.send(1)
    }
    
    launch {
        delay(50)
        channel2.send(2)
    }
    
    val result = select<Int> {
        channel1.onReceive { it }
        channel2.onReceive { it }
    }
    
    println("Result: $result")
}

7.2 选择发送 #

kotlin
fun main() = runBlocking {
    val channel1 = Channel<Int>()
    val channel2 = Channel<Int>()
    
    select<Unit> {
        channel1.onSend(1) { println("Sent to channel1") }
        channel2.onSend(2) { println("Sent to channel2") }
    }
}

八、实战示例 #

8.1 工作队列 #

kotlin
fun main() = runBlocking {
    val jobs = Channel<String>()
    val results = Channel<String>()
    
    // 生产者
    launch {
        repeat(10) { jobs.send("Job $it") }
        jobs.close()
    }
    
    // 工作者
    repeat(3) { workerId ->
        launch {
            for (job in jobs) {
                results.send("Worker $workerId processed $job")
            }
        }
    }
    
    // 收集结果
    launch {
        var count = 0
        for (result in results) {
            println(result)
            count++
            if (count == 10) results.close()
        }
    }.join()
}

8.2 扇入扇出 #

kotlin
// 扇入:多个生产者,一个消费者
suspend fun produceNumbers(channel: SendChannel<Int>, id: Int) {
    repeat(5) {
        channel.send(id * 10 + it)
    }
}

fun main() = runBlocking {
    val channel = Channel<Int>()
    
    // 多个生产者
    repeat(3) { id ->
        launch { produceNumbers(channel, id) }
    }
    
    // 一个消费者
    launch {
        for (num in channel) {
            println("Received: $num")
        }
    }.join()
}

8.3 管道 #

kotlin
fun CoroutineScope.produceNumbers() = produce {
    var x = 1
    while (true) {
        send(x++)
        delay(100)
    }
}

fun CoroutineScope.filterNumbers(
    input: ReceiveChannel<Int>,
    filter: (Int) -> Boolean
) = produce {
    for (num in input) {
        if (filter(num)) send(num)
    }
}

fun main() = runBlocking {
    val numbers = produceNumbers()
    val evens = filterNumbers(numbers) { it % 2 == 0 }
    
    evens.take(5).collect { println(it) }
    
    cancel()
}

九、最佳实践 #

9.1 关闭 Channel #

kotlin
// 推荐:使用 produce 自动关闭
fun CoroutineScope.producer() = produce {
    repeat(10) { send(it) }
}

// 手动关闭时使用 use
channel.use {
    it.send(1)
}

9.2 处理关闭 #

kotlin
for (value in channel) {
    // 自动处理关闭
    process(value)
}

9.3 选择合适的容量 #

kotlin
// 高吞吐量:UNLIMITED
val highThroughput = Channel<Int>(Channel.UNLIMITED)

// 实时性:CONFLATED
val realtime = Channel<Int>(Channel.CONFLATED)

// 默认:RENDEZVOUS
val default = Channel<Int>()

十、总结 #

Channel 要点:

操作 方法
创建 Channel()
发送 send(), trySend()
接收 receive(), tryReceive()
关闭 close()
遍历 for (x in channel)
生产者 produce { }

下一步,让我们学习 协程Flow

最后更新:2026-03-27