Dart异步编程 #
一、异步编程概述 #
1.1 为什么需要异步 #
在开发中,很多操作是耗时的:
- 网络请求
- 文件读写
- 数据库操作
- 定时任务
如果使用同步方式,会阻塞主线程,导致应用卡顿。异步编程可以让这些操作在后台执行,不阻塞主线程。
1.2 Dart异步模型 #
Dart是单线程模型,使用事件循环处理异步操作:
text
┌─────────────────────────────────────┐
│ 事件循环 │
├─────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 微任务队列 │ │ 事件队列 │ │
│ │ Microtask │ │ Event Queue │ │
│ └─────────────┘ └─────────────┘ │
│ ↓ ↓ │
│ ┌─────────────────────────────┐ │
│ │ 执行代码 │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
二、Future #
2.1 Future基础 #
Future表示一个异步操作的结果,它会在将来某个时刻完成。
dart
Future<String> fetchUser() {
return Future.delayed(
Duration(seconds: 2),
() => 'User data',
);
}
fetchUser().then((data) {
print(data);
});
2.2 创建Future #
dart
Future<String> getValue() {
return Future.value('Immediate value');
}
Future<String> getDelayedValue() {
return Future.delayed(Duration(seconds: 1), () => 'Delayed value');
}
Future<String> getError() {
return Future.error('Something went wrong');
}
Future<String> fromAsync() {
return Future(() {
return 'Computed value';
});
}
2.3 then和catchError #
dart
fetchUser()
.then((data) {
print('Success: $data');
return data.toUpperCase();
})
.then((upperData) {
print('Upper: $upperData');
})
.catchError((error) {
print('Error: $error');
});
2.4 whenComplete #
dart
fetchUser()
.then((data) => print(data))
.catchError((error) => print(error))
.whenComplete(() {
print('Operation completed');
});
2.5 Future静态方法 #
dart
Future.wait([
fetchUser(),
fetchPosts(),
fetchComments(),
]).then((results) {
print('All completed: $results');
});
Future.any([
fetchFromServer1(),
fetchFromServer2(),
]).then((firstResult) {
print('First completed: $firstResult');
});
Future.delayed(Duration(seconds: 2), () {
print('Delayed execution');
});
三、async/await #
3.1 基本用法 #
async/await是处理Future的语法糖,让异步代码看起来像同步代码。
dart
Future<String> fetchUser() async {
await Future.delayed(Duration(seconds: 1));
return 'User data';
}
Future<void> main() async {
print('Start');
String data = await fetchUser();
print(data);
print('End');
}
3.2 错误处理 #
dart
Future<void> fetchAndPrint() async {
try {
String data = await fetchUser();
print(data);
} catch (e) {
print('Error: $e');
} finally {
print('Completed');
}
}
3.3 并行执行 #
dart
Future<void> fetchAll() async {
var userFuture = fetchUser();
var postsFuture = fetchPosts();
var user = await userFuture;
var posts = await postsFuture;
print('User: $user');
print('Posts: $posts');
}
Future<void> fetchAllParallel() async {
var results = await Future.wait([
fetchUser(),
fetchPosts(),
]);
print('User: ${results[0]}');
print('Posts: ${results[1]}');
}
3.4 超时处理 #
dart
Future<String> fetchWithTimeout() async {
try {
return await fetchUser()
.timeout(Duration(seconds: 5));
} on TimeoutException {
return 'Request timed out';
}
}
四、Stream #
4.1 Stream基础 #
Stream是一系列异步事件的序列,可以理解为异步的Iterable。
dart
Stream<int> countStream(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
countStream(5).listen((event) {
print(event);
});
4.2 创建Stream #
dart
Stream<int> periodicStream() {
return Stream.periodic(Duration(seconds: 1), (count) => count)
.take(5);
}
Stream<String> fromIterable() {
return Stream.fromIterable(['a', 'b', 'c']);
}
Stream<String> fromFuture() {
return Stream.fromFuture(fetchUser());
}
Stream<int> controllerStream() {
var controller = StreamController<int>();
Timer.periodic(Duration(seconds: 1), (timer) {
controller.add(timer.tick);
if (timer.tick >= 5) {
timer.cancel();
controller.close();
}
});
return controller.stream;
}
4.3 监听Stream #
dart
countStream(5).listen(
(event) {
print('Data: $event');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed');
},
cancelOnError: false,
);
4.4 await for #
dart
Future<void> processStream() async {
await for (var value in countStream(5)) {
print('Value: $value');
}
print('Stream completed');
}
4.5 Stream操作 #
dart
var stream = Stream.fromIterable([1, 2, 3, 4, 5]);
stream
.where((n) => n > 2)
.map((n) => n * 2)
.listen((event) {
print(event);
});
var stream2 = Stream.fromIterable([1, 2, 3, 4, 5]);
var first = await stream2.first;
var last = await stream2.last;
var list = await stream2.toList();
var isEmpty = await stream2.isEmpty;
4.6 StreamController #
dart
class CounterBloc {
final _controller = StreamController<int>();
Stream<int> get stream => _controller.stream;
void increment(int value) {
_controller.add(value);
}
void dispose() {
_controller.close();
}
}
var bloc = CounterBloc();
bloc.stream.listen((count) {
print('Count: $count');
});
bloc.increment(1);
bloc.increment(2);
bloc.dispose();
4.7 广播Stream #
dart
var controller = StreamController<int>.broadcast();
controller.stream.listen((event) {
print('Listener 1: $event');
});
controller.stream.listen((event) {
print('Listener 2: $event');
});
controller.add(1);
controller.add(2);
controller.close();
五、Completer #
5.1 基本用法 #
Completer用于手动控制Future的完成。
dart
Future<String> waitForCondition() {
var completer = Completer<String>();
Timer(Duration(seconds: 2), () {
completer.complete('Condition met');
});
return completer.future;
}
Future<String> waitForResult() {
var completer = Completer<String>();
someAsyncOperation((result, error) {
if (error != null) {
completer.completeError(error);
} else {
completer.complete(result);
}
});
return completer.future;
}
5.2 超时控制 #
dart
Future<String> withTimeout(Future<String> future, Duration timeout) {
var completer = Completer<String>();
future.then(completer.complete).catchError(completer.completeError);
Timer(timeout, () {
if (!completer.isCompleted) {
completer.completeError(TimeoutException('Operation timed out'));
}
});
return completer.future;
}
六、Zone #
6.1 Zone基础 #
Zone提供了一个独立的执行环境,可以捕获和处理异步错误。
dart
void main() {
runZonedGuarded(() {
Future.delayed(Duration(seconds: 1), () {
throw Exception('Async error');
});
}, (error, stackTrace) {
print('Caught error: $error');
});
}
6.2 Zone值 #
dart
void main() {
runZoned(
() {
print(Zone.current[#key]);
someAsyncOperation();
},
zoneValues: {#key: 'zone value'},
);
}
void someAsyncOperation() {
print(Zone.current[#key]);
}
七、Isolate #
7.1 为什么需要Isolate #
Dart是单线程的,CPU密集型任务会阻塞主线程。Isolate提供了真正的并行执行能力。
7.2 创建Isolate #
dart
Future<int> heavyComputation(int value) async {
final receivePort = ReceivePort();
await Isolate.spawn(
_isolateEntry,
[receivePort.sendPort, value],
);
return await receivePort.first as int;
}
void _isolateEntry(List<dynamic> args) {
final sendPort = args[0] as SendPort;
final value = args[1] as int;
int result = 0;
for (int i = 0; i < value; i++) {
result += i;
}
sendPort.send(result);
}
var result = await heavyComputation(10000000);
print(result);
7.3 compute函数 #
Flutter提供了compute函数简化Isolate的使用:
dart
int heavyTask(int value) {
int result = 0;
for (int i = 0; i < value; i++) {
result += i;
}
return result;
}
var result = await compute(heavyTask, 10000000);
八、最佳实践 #
8.1 错误处理 #
dart
Future<void> goodPractice() async {
try {
final result = await fetchData();
processData(result);
} on NetworkException catch (e) {
handleNetworkError(e);
} on FormatException catch (e) {
handleFormatError(e);
} catch (e) {
handleUnknownError(e);
} finally {
cleanup();
}
}
8.2 取消操作 #
dart
class CancellableOperation {
bool _cancelled = false;
Future<String> execute() async {
for (int i = 0; i < 10; i++) {
if (_cancelled) {
throw Exception('Operation cancelled');
}
await Future.delayed(Duration(seconds: 1));
}
return 'Completed';
}
void cancel() {
_cancelled = true;
}
}
8.3 并发控制 #
dart
Future<List<T>> runWithConcurrency<T>(
List<Future<T> Function()> tasks,
int concurrency,
) async {
var results = <T>[];
var queue = Queue<Future<T> Function()>.from(tasks);
Future<void> worker() async {
while (queue.isNotEmpty) {
var task = queue.removeFirst();
results.add(await task());
}
}
await Future.wait(
List.generate(concurrency, (_) => worker()),
);
return results;
}
九、总结 #
9.1 核心要点 #
| 要点 | 说明 |
|---|---|
| Future | 表示异步操作的结果 |
| async/await | 异步代码的同步写法 |
| Stream | 异步事件序列 |
| Completer | 手动控制Future完成 |
| Isolate | 真正的并行执行 |
9.2 下一步 #
掌握了异步编程后,让我们开始学习 Widget概述!
最后更新:2026-03-28