Kotlin 协程 Flow #

一、Flow 概述 #

Flow 是 Kotlin 协程中的响应式流,用于处理异步数据流。

1.1 Flow vs Sequence #

特性 Flow Sequence
执行 异步 同步
挂起 支持 不支持
上下文 可切换 单一

1.2 冷流 vs 热流 #

  • 冷流:只有被收集时才发射数据
  • 热流:无论是否有收集者都发射数据

二、创建 Flow #

2.1 flow 构建器 #

kotlin
fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    simpleFlow().collect { println(it) }
}

2.2 flowOf #

kotlin
val flow = flowOf(1, 2, 3, 4, 5)

fun main() = runBlocking {
    flow.collect { println(it) }
}

2.3 asFlow #

kotlin
fun main() = runBlocking {
    listOf(1, 2, 3, 4, 5)
        .asFlow()
        .collect { println(it) }
    
    (1..5).asFlow()
        .collect { println(it) }
}

2.4 channelFlow #

kotlin
fun channelFlow(): Flow<Int> = channelFlow {
    launch {
        repeat(5) {
            send(it)
            delay(100)
        }
    }
}

三、收集 Flow #

3.1 collect #

kotlin
fun main() = runBlocking {
    flowOf(1, 2, 3)
        .collect { println(it) }
}

3.2 collectLatest #

kotlin
fun main() = runBlocking {
    flow {
        emit(1)
        delay(100)
        emit(2)
        delay(100)
        emit(3)
    }
        .collectLatest { value ->
            println("Processing $value")
            delay(150)
            println("Done $value")
        }
}
// 只会完成最后一个值的处理

3.3 toList / toSet #

kotlin
fun main() = runBlocking {
    val list = flowOf(1, 2, 3).toList()
    val set = flowOf(1, 2, 2, 3).toSet()
}

3.4 first / single #

kotlin
fun main() = runBlocking {
    val first = flowOf(1, 2, 3).first()
    val single = flowOf(1).single()
}

四、中间操作符 #

4.1 map #

kotlin
fun main() = runBlocking {
    flowOf(1, 2, 3)
        .map { it * 2 }
        .collect { println(it) }
    // 2, 4, 6
}

4.2 filter #

kotlin
fun main() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .filter { it % 2 == 0 }
        .collect { println(it) }
    // 2, 4
}

4.3 transform #

kotlin
fun main() = runBlocking {
    flowOf(1, 2, 3)
        .transform {
            emit("Value: $it")
            emit("Double: ${it * 2}")
        }
        .collect { println(it) }
}

4.4 take / drop #

kotlin
fun main() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .take(3)
        .collect { println(it) }
    // 1, 2, 3
    
    flowOf(1, 2, 3, 4, 5)
        .drop(2)
        .collect { println(it) }
    // 3, 4, 5
}

4.5 debounce #

kotlin
fun main() = runBlocking {
    flow {
        emit(1)
        delay(50)
        emit(2)
        delay(100)
        emit(3)
    }
        .debounce(75)
        .collect { println(it) }
    // 2, 3
}

4.6 distinctUntilChanged #

kotlin
fun main() = runBlocking {
    flowOf(1, 1, 2, 2, 3, 2, 2)
        .distinctUntilChanged()
        .collect { println(it) }
    // 1, 2, 3, 2
}

五、组合操作符 #

5.1 zip #

kotlin
fun main() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf("A", "B", "C")
    
    flow1.zip(flow2) { a, b -> "$a$b" }
        .collect { println(it) }
    // 1A, 2B, 3C
}

5.2 combine #

kotlin
fun main() = runBlocking {
    val flow1 = flowOf(1, 2, 3).onEach { delay(100) }
    val flow2 = flowOf("A", "B").onEach { delay(150) }
    
    flow1.combine(flow2) { a, b -> "$a$b" }
        .collect { println(it) }
    // 1A, 2A, 2B, 3B
}

5.3 flattenConcat #

kotlin
fun main() = runBlocking {
    flowOf(flowOf(1, 2), flowOf(3, 4))
        .flattenConcat()
        .collect { println(it) }
    // 1, 2, 3, 4
}

5.4 flattenMerge #

kotlin
fun main() = runBlocking {
    flowOf(flowOf(1, 2), flowOf(3, 4))
        .flattenMerge()
        .collect { println(it) }
    // 并行收集
}

六、异常处理 #

6.1 catch #

kotlin
fun main() = runBlocking {
    flow {
        emit(1)
        throw Exception("Error!")
    }
        .catch { e -> println("Caught: $e") }
        .collect { println(it) }
}

6.2 onCompletion #

kotlin
fun main() = runBlocking {
    flowOf(1, 2, 3)
        .onCompletion { println("Completed") }
        .collect { println(it) }
}

6.3 retry #

kotlin
fun main() = runBlocking {
    flow {
        emit(1)
        throw Exception("Error!")
    }
        .retry(3)
        .catch { println("Failed after retries") }
        .collect { println(it) }
}

七、上下文切换 #

7.1 flowOn #

kotlin
fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            println("Emitting $i on ${Thread.currentThread().name}")
            emit(i)
        }
    }
        .flowOn(Dispatchers.IO)
        .collect {
            println("Collecting $it on ${Thread.currentThread().name}")
        }
}

7.2 withContext #

kotlin
fun main() = runBlocking {
    flow {
        withContext(Dispatchers.IO) {
            emit(fetchData())
        }
    }.collect { println(it) }
}

八、StateFlow #

8.1 创建 StateFlow #

kotlin
class ViewModel {
    private val _state = MutableStateFlow(0)
    val state: StateFlow<Int> = _state.asStateFlow()
    
    fun increment() {
        _state.value++
    }
}

8.2 收集 StateFlow #

kotlin
fun main() = runBlocking {
    val stateFlow = MutableStateFlow(0)
    
    launch {
        stateFlow.collect { println("Value: $it") }
    }
    
    stateFlow.value = 1
    stateFlow.value = 2
}

九、SharedFlow #

9.1 创建 SharedFlow #

kotlin
class EventBus {
    private val _events = MutableSharedFlow<String>()
    val events: SharedFlow<String> = _events.asSharedFlow()
    
    suspend fun send(event: String) {
        _events.emit(event)
    }
}

9.2 配置 SharedFlow #

kotlin
val sharedFlow = MutableSharedFlow<String>(
    replay = 10,        // 重放数量
    extraBufferCapacity = 10,  // 额外缓冲
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

十、实战示例 #

10.1 搜索建议 #

kotlin
fun searchQuery(queryFlow: Flow<String>): Flow<List<Result>> {
    return queryFlow
        .debounce(300)
        .filter { it.length >= 2 }
        .distinctUntilChanged()
        .mapLatest { query ->
            api.search(query)
        }
        .catch { e -> emit(emptyList()) }
}

10.2 倒计时 #

kotlin
fun countdown(start: Int): Flow<Int> = flow {
    for (i in start downTo 0) {
        emit(i)
        delay(1000)
    }
}

fun main() = runBlocking {
    countdown(10)
        .onCompletion { println("Done!") }
        .collect { println("$it seconds remaining") }
}

10.3 网络状态监听 #

kotlin
class NetworkMonitor(context: Context) {
    private val _isConnected = MutableStateFlow(false)
    val isConnected: StateFlow<Boolean> = _isConnected.asStateFlow()
    
    init {
        // 监听网络状态变化
        val callback = object : ConnectivityManager.NetworkCallback() {
            override fun onAvailable(network: Network) {
                _isConnected.value = true
            }
            
            override fun onLost(network: Network) {
                _isConnected.value = false
            }
        }
    }
}

十一、最佳实践 #

11.1 使用 flowOn 切换上下文 #

kotlin
flow {
    // 在 IO 线程执行
    emit(fetchData())
}
    .flowOn(Dispatchers.IO)
    .collect { }

11.2 处理异常 #

kotlin
flow { }
    .catch { }
    .collect { }

11.3 使用 StateFlow 管理状态 #

kotlin
class ViewModel {
    private val _uiState = MutableStateFlow(UiState())
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()
}

十二、总结 #

Flow 要点:

类型 说明
Flow 冷流
StateFlow 状态流
SharedFlow 共享流
MutableStateFlow 可变状态流
MutableSharedFlow 可变共享流

恭喜你完成了 Kotlin 完全指南的学习!

最后更新:2026-03-27