Kotlin 协程通道 #
一、Channel 概述 #
Channel 是协程之间传递数据的管道,类似于 BlockingQueue,但是非阻塞的。
二、创建 Channel #
2.1 基本创建 #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
channel.send(1)
channel.send(2)
channel.close()
}
println(channel.receive()) // 1
println(channel.receive()) // 2
}
2.2 Channel 容量 #
kotlin
// 无限容量
val unlimited = Channel<Int>(Channel.UNLIMITED)
// 有限容量
val buffered = Channel<Int>(10)
// 无缓冲(Rendezvous)
val rendezvous = Channel<Int>(Channel.RENDEZVOUS)
// 溢出策略
val conflated = Channel<Int>(Channel.CONFLATED) // 只保留最新
三、发送与接收 #
3.1 send 和 receive #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(5) {
channel.send(it)
}
channel.close()
}
repeat(5) {
println(channel.receive())
}
}
3.2 for 循环接收 #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(5) { channel.send(it) }
channel.close()
}
for (value in channel) {
println(value)
}
}
3.3 receiveCatching #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
channel.send(1)
channel.close()
}
val result = channel.receiveCatching()
when (result) {
is ChannelResult.Closed -> println("Channel closed")
is ChannelResult.Success -> println("Received: ${result.value}")
}
}
3.4 trySend 和 tryReceive #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>(1)
channel.trySend(1) // 非阻塞发送
channel.trySend(2) // 可能失败
val result = channel.tryReceive() // 非阻塞接收
println(result.getOrNull())
}
四、关闭 Channel #
4.1 关闭通道 #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(5) { channel.send(it) }
channel.close() // 关闭通道
}
for (value in channel) {
println(value)
}
println("Channel is closed: ${channel.isClosedForReceive}")
}
4.2 检查关闭状态 #
kotlin
val channel = Channel<Int>()
channel.isClosedForReceive // 是否关闭接收
channel.isClosedForSend // 是否关闭发送
五、生产者消费者 #
5.1 produce #
kotlin
fun CoroutineScope.produceNumbers() = produce {
repeat(10) {
send(it)
}
}
fun main() = runBlocking {
val numbers = produceNumbers()
for (num in numbers) {
println(num)
}
}
5.2 消费者 #
kotlin
fun CoroutineScope.consumeNumbers(channel: ReceiveChannel<Int>) = launch {
for (num in channel) {
println("Consuming: $num")
}
}
fun main() = runBlocking {
val producer = produceNumbers()
val consumer = consumeNumbers(producer)
consumer.join()
}
5.3 多消费者 #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
// 生产者
launch {
repeat(10) {
channel.send(it)
}
channel.close()
}
// 多个消费者
repeat(3) { workerId ->
launch {
for (value in channel) {
println("Worker $workerId: $value")
}
}
}
}
六、Channel 操作 #
6.1 map #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(5) { channel.send(it) }
channel.close()
}
val mapped = channel.map { it * 2 }
for (value in mapped) {
println(value)
}
}
6.2 filter #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(10) { channel.send(it) }
channel.close()
}
val filtered = channel.filter { it % 2 == 0 }
for (value in filtered) {
println(value)
}
}
6.3 toList #
kotlin
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
repeat(5) { channel.send(it) }
channel.close()
}
val list = channel.toList()
println(list)
}
七、Select #
7.1 选择第一个结果 #
kotlin
fun main() = runBlocking {
val channel1 = Channel<Int>()
val channel2 = Channel<Int>()
launch {
delay(100)
channel1.send(1)
}
launch {
delay(50)
channel2.send(2)
}
val result = select<Int> {
channel1.onReceive { it }
channel2.onReceive { it }
}
println("Result: $result")
}
7.2 选择发送 #
kotlin
fun main() = runBlocking {
val channel1 = Channel<Int>()
val channel2 = Channel<Int>()
select<Unit> {
channel1.onSend(1) { println("Sent to channel1") }
channel2.onSend(2) { println("Sent to channel2") }
}
}
八、实战示例 #
8.1 工作队列 #
kotlin
fun main() = runBlocking {
val jobs = Channel<String>()
val results = Channel<String>()
// 生产者
launch {
repeat(10) { jobs.send("Job $it") }
jobs.close()
}
// 工作者
repeat(3) { workerId ->
launch {
for (job in jobs) {
results.send("Worker $workerId processed $job")
}
}
}
// 收集结果
launch {
var count = 0
for (result in results) {
println(result)
count++
if (count == 10) results.close()
}
}.join()
}
8.2 扇入扇出 #
kotlin
// 扇入:多个生产者,一个消费者
suspend fun produceNumbers(channel: SendChannel<Int>, id: Int) {
repeat(5) {
channel.send(id * 10 + it)
}
}
fun main() = runBlocking {
val channel = Channel<Int>()
// 多个生产者
repeat(3) { id ->
launch { produceNumbers(channel, id) }
}
// 一个消费者
launch {
for (num in channel) {
println("Received: $num")
}
}.join()
}
8.3 管道 #
kotlin
fun CoroutineScope.produceNumbers() = produce {
var x = 1
while (true) {
send(x++)
delay(100)
}
}
fun CoroutineScope.filterNumbers(
input: ReceiveChannel<Int>,
filter: (Int) -> Boolean
) = produce {
for (num in input) {
if (filter(num)) send(num)
}
}
fun main() = runBlocking {
val numbers = produceNumbers()
val evens = filterNumbers(numbers) { it % 2 == 0 }
evens.take(5).collect { println(it) }
cancel()
}
九、最佳实践 #
9.1 关闭 Channel #
kotlin
// 推荐:使用 produce 自动关闭
fun CoroutineScope.producer() = produce {
repeat(10) { send(it) }
}
// 手动关闭时使用 use
channel.use {
it.send(1)
}
9.2 处理关闭 #
kotlin
for (value in channel) {
// 自动处理关闭
process(value)
}
9.3 选择合适的容量 #
kotlin
// 高吞吐量:UNLIMITED
val highThroughput = Channel<Int>(Channel.UNLIMITED)
// 实时性:CONFLATED
val realtime = Channel<Int>(Channel.CONFLATED)
// 默认:RENDEZVOUS
val default = Channel<Int>()
十、总结 #
Channel 要点:
| 操作 | 方法 |
|---|---|
| 创建 | Channel() |
| 发送 | send(), trySend() |
| 接收 | receive(), tryReceive() |
| 关闭 | close() |
| 遍历 | for (x in channel) |
| 生产者 | produce { } |
下一步,让我们学习 协程Flow!
最后更新:2026-03-27