Kotlin 协程 Flow #
一、Flow 概述 #
Flow 是 Kotlin 协程中的响应式流,用于处理异步数据流。
1.1 Flow vs Sequence #
| 特性 | Flow | Sequence |
|---|---|---|
| 执行 | 异步 | 同步 |
| 挂起 | 支持 | 不支持 |
| 上下文 | 可切换 | 单一 |
1.2 冷流 vs 热流 #
- 冷流:只有被收集时才发射数据
- 热流:无论是否有收集者都发射数据
二、创建 Flow #
2.1 flow 构建器 #
kotlin
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
simpleFlow().collect { println(it) }
}
2.2 flowOf #
kotlin
val flow = flowOf(1, 2, 3, 4, 5)
fun main() = runBlocking {
flow.collect { println(it) }
}
2.3 asFlow #
kotlin
fun main() = runBlocking {
listOf(1, 2, 3, 4, 5)
.asFlow()
.collect { println(it) }
(1..5).asFlow()
.collect { println(it) }
}
2.4 channelFlow #
kotlin
fun channelFlow(): Flow<Int> = channelFlow {
launch {
repeat(5) {
send(it)
delay(100)
}
}
}
三、收集 Flow #
3.1 collect #
kotlin
fun main() = runBlocking {
flowOf(1, 2, 3)
.collect { println(it) }
}
3.2 collectLatest #
kotlin
fun main() = runBlocking {
flow {
emit(1)
delay(100)
emit(2)
delay(100)
emit(3)
}
.collectLatest { value ->
println("Processing $value")
delay(150)
println("Done $value")
}
}
// 只会完成最后一个值的处理
3.3 toList / toSet #
kotlin
fun main() = runBlocking {
val list = flowOf(1, 2, 3).toList()
val set = flowOf(1, 2, 2, 3).toSet()
}
3.4 first / single #
kotlin
fun main() = runBlocking {
val first = flowOf(1, 2, 3).first()
val single = flowOf(1).single()
}
四、中间操作符 #
4.1 map #
kotlin
fun main() = runBlocking {
flowOf(1, 2, 3)
.map { it * 2 }
.collect { println(it) }
// 2, 4, 6
}
4.2 filter #
kotlin
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.collect { println(it) }
// 2, 4
}
4.3 transform #
kotlin
fun main() = runBlocking {
flowOf(1, 2, 3)
.transform {
emit("Value: $it")
emit("Double: ${it * 2}")
}
.collect { println(it) }
}
4.4 take / drop #
kotlin
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3)
.collect { println(it) }
// 1, 2, 3
flowOf(1, 2, 3, 4, 5)
.drop(2)
.collect { println(it) }
// 3, 4, 5
}
4.5 debounce #
kotlin
fun main() = runBlocking {
flow {
emit(1)
delay(50)
emit(2)
delay(100)
emit(3)
}
.debounce(75)
.collect { println(it) }
// 2, 3
}
4.6 distinctUntilChanged #
kotlin
fun main() = runBlocking {
flowOf(1, 1, 2, 2, 3, 2, 2)
.distinctUntilChanged()
.collect { println(it) }
// 1, 2, 3, 2
}
五、组合操作符 #
5.1 zip #
kotlin
fun main() = runBlocking {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf("A", "B", "C")
flow1.zip(flow2) { a, b -> "$a$b" }
.collect { println(it) }
// 1A, 2B, 3C
}
5.2 combine #
kotlin
fun main() = runBlocking {
val flow1 = flowOf(1, 2, 3).onEach { delay(100) }
val flow2 = flowOf("A", "B").onEach { delay(150) }
flow1.combine(flow2) { a, b -> "$a$b" }
.collect { println(it) }
// 1A, 2A, 2B, 3B
}
5.3 flattenConcat #
kotlin
fun main() = runBlocking {
flowOf(flowOf(1, 2), flowOf(3, 4))
.flattenConcat()
.collect { println(it) }
// 1, 2, 3, 4
}
5.4 flattenMerge #
kotlin
fun main() = runBlocking {
flowOf(flowOf(1, 2), flowOf(3, 4))
.flattenMerge()
.collect { println(it) }
// 并行收集
}
六、异常处理 #
6.1 catch #
kotlin
fun main() = runBlocking {
flow {
emit(1)
throw Exception("Error!")
}
.catch { e -> println("Caught: $e") }
.collect { println(it) }
}
6.2 onCompletion #
kotlin
fun main() = runBlocking {
flowOf(1, 2, 3)
.onCompletion { println("Completed") }
.collect { println(it) }
}
6.3 retry #
kotlin
fun main() = runBlocking {
flow {
emit(1)
throw Exception("Error!")
}
.retry(3)
.catch { println("Failed after retries") }
.collect { println(it) }
}
七、上下文切换 #
7.1 flowOn #
kotlin
fun main() = runBlocking {
flow {
for (i in 1..5) {
println("Emitting $i on ${Thread.currentThread().name}")
emit(i)
}
}
.flowOn(Dispatchers.IO)
.collect {
println("Collecting $it on ${Thread.currentThread().name}")
}
}
7.2 withContext #
kotlin
fun main() = runBlocking {
flow {
withContext(Dispatchers.IO) {
emit(fetchData())
}
}.collect { println(it) }
}
八、StateFlow #
8.1 创建 StateFlow #
kotlin
class ViewModel {
private val _state = MutableStateFlow(0)
val state: StateFlow<Int> = _state.asStateFlow()
fun increment() {
_state.value++
}
}
8.2 收集 StateFlow #
kotlin
fun main() = runBlocking {
val stateFlow = MutableStateFlow(0)
launch {
stateFlow.collect { println("Value: $it") }
}
stateFlow.value = 1
stateFlow.value = 2
}
九、SharedFlow #
9.1 创建 SharedFlow #
kotlin
class EventBus {
private val _events = MutableSharedFlow<String>()
val events: SharedFlow<String> = _events.asSharedFlow()
suspend fun send(event: String) {
_events.emit(event)
}
}
9.2 配置 SharedFlow #
kotlin
val sharedFlow = MutableSharedFlow<String>(
replay = 10, // 重放数量
extraBufferCapacity = 10, // 额外缓冲
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
十、实战示例 #
10.1 搜索建议 #
kotlin
fun searchQuery(queryFlow: Flow<String>): Flow<List<Result>> {
return queryFlow
.debounce(300)
.filter { it.length >= 2 }
.distinctUntilChanged()
.mapLatest { query ->
api.search(query)
}
.catch { e -> emit(emptyList()) }
}
10.2 倒计时 #
kotlin
fun countdown(start: Int): Flow<Int> = flow {
for (i in start downTo 0) {
emit(i)
delay(1000)
}
}
fun main() = runBlocking {
countdown(10)
.onCompletion { println("Done!") }
.collect { println("$it seconds remaining") }
}
10.3 网络状态监听 #
kotlin
class NetworkMonitor(context: Context) {
private val _isConnected = MutableStateFlow(false)
val isConnected: StateFlow<Boolean> = _isConnected.asStateFlow()
init {
// 监听网络状态变化
val callback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
_isConnected.value = true
}
override fun onLost(network: Network) {
_isConnected.value = false
}
}
}
}
十一、最佳实践 #
11.1 使用 flowOn 切换上下文 #
kotlin
flow {
// 在 IO 线程执行
emit(fetchData())
}
.flowOn(Dispatchers.IO)
.collect { }
11.2 处理异常 #
kotlin
flow { }
.catch { }
.collect { }
11.3 使用 StateFlow 管理状态 #
kotlin
class ViewModel {
private val _uiState = MutableStateFlow(UiState())
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
}
十二、总结 #
Flow 要点:
| 类型 | 说明 |
|---|---|
| Flow | 冷流 |
| StateFlow | 状态流 |
| SharedFlow | 共享流 |
| MutableStateFlow | 可变状态流 |
| MutableSharedFlow | 可变共享流 |
恭喜你完成了 Kotlin 完全指南的学习!
最后更新:2026-03-27