Dart异步编程 #

一、异步编程概述 #

1.1 为什么需要异步 #

在开发中,很多操作是耗时的:

  • 网络请求
  • 文件读写
  • 数据库操作
  • 定时任务

如果使用同步方式,会阻塞主线程,导致应用卡顿。异步编程可以让这些操作在后台执行,不阻塞主线程。

1.2 Dart异步模型 #

Dart是单线程模型,使用事件循环处理异步操作:

text
┌─────────────────────────────────────┐
│           事件循环                   │
├─────────────────────────────────────┤
│  ┌─────────────┐ ┌─────────────┐   │
│  │ 微任务队列   │ │ 事件队列    │   │
│  │ Microtask   │ │ Event Queue │   │
│  └─────────────┘ └─────────────┘   │
│          ↓              ↓          │
│  ┌─────────────────────────────┐   │
│  │         执行代码             │   │
│  └─────────────────────────────┘   │
└─────────────────────────────────────┘

二、Future #

2.1 Future基础 #

Future表示一个异步操作的结果,它会在将来某个时刻完成。

dart
Future<String> fetchUser() {
  return Future.delayed(
    Duration(seconds: 2),
    () => 'User data',
  );
}

fetchUser().then((data) {
  print(data);
});

2.2 创建Future #

dart
Future<String> getValue() {
  return Future.value('Immediate value');
}

Future<String> getDelayedValue() {
  return Future.delayed(Duration(seconds: 1), () => 'Delayed value');
}

Future<String> getError() {
  return Future.error('Something went wrong');
}

Future<String> fromAsync() {
  return Future(() {
    return 'Computed value';
  });
}

2.3 then和catchError #

dart
fetchUser()
  .then((data) {
    print('Success: $data');
    return data.toUpperCase();
  })
  .then((upperData) {
    print('Upper: $upperData');
  })
  .catchError((error) {
    print('Error: $error');
  });

2.4 whenComplete #

dart
fetchUser()
  .then((data) => print(data))
  .catchError((error) => print(error))
  .whenComplete(() {
    print('Operation completed');
  });

2.5 Future静态方法 #

dart
Future.wait([
  fetchUser(),
  fetchPosts(),
  fetchComments(),
]).then((results) {
  print('All completed: $results');
});

Future.any([
  fetchFromServer1(),
  fetchFromServer2(),
]).then((firstResult) {
  print('First completed: $firstResult');
});

Future.delayed(Duration(seconds: 2), () {
  print('Delayed execution');
});

三、async/await #

3.1 基本用法 #

async/await是处理Future的语法糖,让异步代码看起来像同步代码。

dart
Future<String> fetchUser() async {
  await Future.delayed(Duration(seconds: 1));
  return 'User data';
}

Future<void> main() async {
  print('Start');
  String data = await fetchUser();
  print(data);
  print('End');
}

3.2 错误处理 #

dart
Future<void> fetchAndPrint() async {
  try {
    String data = await fetchUser();
    print(data);
  } catch (e) {
    print('Error: $e');
  } finally {
    print('Completed');
  }
}

3.3 并行执行 #

dart
Future<void> fetchAll() async {
  var userFuture = fetchUser();
  var postsFuture = fetchPosts();
  
  var user = await userFuture;
  var posts = await postsFuture;
  
  print('User: $user');
  print('Posts: $posts');
}

Future<void> fetchAllParallel() async {
  var results = await Future.wait([
    fetchUser(),
    fetchPosts(),
  ]);
  
  print('User: ${results[0]}');
  print('Posts: ${results[1]}');
}

3.4 超时处理 #

dart
Future<String> fetchWithTimeout() async {
  try {
    return await fetchUser()
      .timeout(Duration(seconds: 5));
  } on TimeoutException {
    return 'Request timed out';
  }
}

四、Stream #

4.1 Stream基础 #

Stream是一系列异步事件的序列,可以理解为异步的Iterable。

dart
Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}

countStream(5).listen((event) {
  print(event);
});

4.2 创建Stream #

dart
Stream<int> periodicStream() {
  return Stream.periodic(Duration(seconds: 1), (count) => count)
    .take(5);
}

Stream<String> fromIterable() {
  return Stream.fromIterable(['a', 'b', 'c']);
}

Stream<String> fromFuture() {
  return Stream.fromFuture(fetchUser());
}

Stream<int> controllerStream() {
  var controller = StreamController<int>();
  
  Timer.periodic(Duration(seconds: 1), (timer) {
    controller.add(timer.tick);
    if (timer.tick >= 5) {
      timer.cancel();
      controller.close();
    }
  });
  
  return controller.stream;
}

4.3 监听Stream #

dart
countStream(5).listen(
  (event) {
    print('Data: $event');
  },
  onError: (error) {
    print('Error: $error');
  },
  onDone: () {
    print('Stream closed');
  },
  cancelOnError: false,
);

4.4 await for #

dart
Future<void> processStream() async {
  await for (var value in countStream(5)) {
    print('Value: $value');
  }
  print('Stream completed');
}

4.5 Stream操作 #

dart
var stream = Stream.fromIterable([1, 2, 3, 4, 5]);

stream
  .where((n) => n > 2)
  .map((n) => n * 2)
  .listen((event) {
    print(event);
  });

var stream2 = Stream.fromIterable([1, 2, 3, 4, 5]);
var first = await stream2.first;
var last = await stream2.last;
var list = await stream2.toList();
var isEmpty = await stream2.isEmpty;

4.6 StreamController #

dart
class CounterBloc {
  final _controller = StreamController<int>();
  
  Stream<int> get stream => _controller.stream;
  
  void increment(int value) {
    _controller.add(value);
  }
  
  void dispose() {
    _controller.close();
  }
}

var bloc = CounterBloc();
bloc.stream.listen((count) {
  print('Count: $count');
});

bloc.increment(1);
bloc.increment(2);
bloc.dispose();

4.7 广播Stream #

dart
var controller = StreamController<int>.broadcast();

controller.stream.listen((event) {
  print('Listener 1: $event');
});

controller.stream.listen((event) {
  print('Listener 2: $event');
});

controller.add(1);
controller.add(2);
controller.close();

五、Completer #

5.1 基本用法 #

Completer用于手动控制Future的完成。

dart
Future<String> waitForCondition() {
  var completer = Completer<String>();
  
  Timer(Duration(seconds: 2), () {
    completer.complete('Condition met');
  });
  
  return completer.future;
}

Future<String> waitForResult() {
  var completer = Completer<String>();
  
  someAsyncOperation((result, error) {
    if (error != null) {
      completer.completeError(error);
    } else {
      completer.complete(result);
    }
  });
  
  return completer.future;
}

5.2 超时控制 #

dart
Future<String> withTimeout(Future<String> future, Duration timeout) {
  var completer = Completer<String>();
  
  future.then(completer.complete).catchError(completer.completeError);
  
  Timer(timeout, () {
    if (!completer.isCompleted) {
      completer.completeError(TimeoutException('Operation timed out'));
    }
  });
  
  return completer.future;
}

六、Zone #

6.1 Zone基础 #

Zone提供了一个独立的执行环境,可以捕获和处理异步错误。

dart
void main() {
  runZonedGuarded(() {
    Future.delayed(Duration(seconds: 1), () {
      throw Exception('Async error');
    });
  }, (error, stackTrace) {
    print('Caught error: $error');
  });
}

6.2 Zone值 #

dart
void main() {
  runZoned(
    () {
      print(Zone.current[#key]);
      someAsyncOperation();
    },
    zoneValues: {#key: 'zone value'},
  );
}

void someAsyncOperation() {
  print(Zone.current[#key]);
}

七、Isolate #

7.1 为什么需要Isolate #

Dart是单线程的,CPU密集型任务会阻塞主线程。Isolate提供了真正的并行执行能力。

7.2 创建Isolate #

dart
Future<int> heavyComputation(int value) async {
  final receivePort = ReceivePort();
  
  await Isolate.spawn(
    _isolateEntry,
    [receivePort.sendPort, value],
  );
  
  return await receivePort.first as int;
}

void _isolateEntry(List<dynamic> args) {
  final sendPort = args[0] as SendPort;
  final value = args[1] as int;
  
  int result = 0;
  for (int i = 0; i < value; i++) {
    result += i;
  }
  
  sendPort.send(result);
}

var result = await heavyComputation(10000000);
print(result);

7.3 compute函数 #

Flutter提供了compute函数简化Isolate的使用:

dart
int heavyTask(int value) {
  int result = 0;
  for (int i = 0; i < value; i++) {
    result += i;
  }
  return result;
}

var result = await compute(heavyTask, 10000000);

八、最佳实践 #

8.1 错误处理 #

dart
Future<void> goodPractice() async {
  try {
    final result = await fetchData();
    processData(result);
  } on NetworkException catch (e) {
    handleNetworkError(e);
  } on FormatException catch (e) {
    handleFormatError(e);
  } catch (e) {
    handleUnknownError(e);
  } finally {
    cleanup();
  }
}

8.2 取消操作 #

dart
class CancellableOperation {
  bool _cancelled = false;
  
  Future<String> execute() async {
    for (int i = 0; i < 10; i++) {
      if (_cancelled) {
        throw Exception('Operation cancelled');
      }
      await Future.delayed(Duration(seconds: 1));
    }
    return 'Completed';
  }
  
  void cancel() {
    _cancelled = true;
  }
}

8.3 并发控制 #

dart
Future<List<T>> runWithConcurrency<T>(
  List<Future<T> Function()> tasks,
  int concurrency,
) async {
  var results = <T>[];
  var queue = Queue<Future<T> Function()>.from(tasks);
  
  Future<void> worker() async {
    while (queue.isNotEmpty) {
      var task = queue.removeFirst();
      results.add(await task());
    }
  }
  
  await Future.wait(
    List.generate(concurrency, (_) => worker()),
  );
  
  return results;
}

九、总结 #

9.1 核心要点 #

要点 说明
Future 表示异步操作的结果
async/await 异步代码的同步写法
Stream 异步事件序列
Completer 手动控制Future完成
Isolate 真正的并行执行

9.2 下一步 #

掌握了异步编程后,让我们开始学习 Widget概述

最后更新:2026-03-28