异步迭代 #
一、异步迭代概述 #
异步迭代允许你遍历异步产生的数据序列,使用 for await...of 语法和异步生成器函数。
二、for await…of #
2.1 基本语法 #
typescript
async function processStream() {
const stream = Deno.stdin.readable;
for await (const chunk of stream) {
console.log(new TextDecoder().decode(chunk));
}
}
2.2 遍历异步可迭代对象 #
typescript
async function* asyncGenerator() {
yield 1;
yield 2;
yield 3;
}
async function example() {
for await (const value of asyncGenerator()) {
console.log(value); // 1, 2, 3
}
}
2.3 读取文件行 #
typescript
import { readLines } from "https://deno.land/std@0.208.0/io/mod.ts";
async function readFileLines(path: string) {
const file = await Deno.open(path);
for await (const line of readLines(file)) {
console.log(line);
}
file.close();
}
三、异步生成器 #
3.1 基本语法 #
typescript
async function* asyncGenerator() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(3);
}
3.2 生成异步序列 #
typescript
async function* countdown(from: number) {
while (from > 0) {
await delay(1000);
yield from--;
}
}
async function example() {
for await (const num of countdown(5)) {
console.log(num); // 5, 4, 3, 2, 1 (每秒一个)
}
}
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
3.3 分页数据 #
typescript
interface Page<T> {
items: T[];
nextCursor?: string;
}
async function* paginate<T>(
fetchPage: (cursor?: string) => Promise<Page<T>>
): AsyncGenerator<T> {
let cursor: string | undefined;
do {
const page = await fetchPage(cursor);
for (const item of page.items) {
yield item;
}
cursor = page.nextCursor;
} while (cursor);
}
async function fetchUsers(cursor?: string): Promise<Page<User>> {
const url = cursor
? `/api/users?cursor=${cursor}`
: "/api/users";
const response = await fetch(url);
return response.json();
}
async function getAllUsers() {
const users: User[] = [];
for await (const user of paginate(fetchUsers)) {
users.push(user);
}
return users;
}
四、异步迭代器 #
4.1 实现异步迭代器 #
typescript
class AsyncRange {
constructor(
private start: number,
private end: number,
private delay: number = 0
) {}
[Symbol.asyncIterator](): AsyncIterator<number> {
let current = this.start;
return {
next: async () => {
if (current <= this.end) {
if (this.delay > 0) {
await new Promise(resolve => setTimeout(resolve, this.delay));
}
return { done: false, value: current++ };
}
return { done: true, value: undefined };
}
};
}
}
async function example() {
const range = new AsyncRange(1, 5, 1000);
for await (const num of range) {
console.log(num); // 1, 2, 3, 4, 5 (每秒一个)
}
}
4.2 可迭代异步生成器 #
typescript
class AsyncQueue<T> {
private queue: T[] = [];
private resolvers: Array<(value: IteratorResult<T>) => void> = [];
push(value: T): void {
if (this.resolvers.length > 0) {
const resolve = this.resolvers.shift()!;
resolve({ done: false, value });
} else {
this.queue.push(value);
}
}
end(): void {
for (const resolve of this.resolvers) {
resolve({ done: true, value: undefined });
}
this.resolvers = [];
}
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
next: (): Promise<IteratorResult<T>> => {
if (this.queue.length > 0) {
return Promise.resolve({ done: false, value: this.queue.shift()! });
}
return new Promise(resolve => this.resolvers.push(resolve));
}
};
}
}
async function example() {
const queue = new AsyncQueue<number>();
// 生产者
(async () => {
for (let i = 0; i < 5; i++) {
await delay(100);
queue.push(i);
}
queue.end();
})();
// 消费者
for await (const value of queue) {
console.log(value); // 0, 1, 2, 3, 4
}
}
五、流处理 #
5.1 处理ReadableStream #
typescript
async function processStream(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(decoder.decode(value));
}
} finally {
reader.releaseLock();
}
}
5.2 转换流 #
typescript
async function* transformStream(
stream: ReadableStream<Uint8Array>,
transformer: (chunk: string) => string
): AsyncGenerator<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield transformer(decoder.decode(value));
}
} finally {
reader.releaseLock();
}
}
六、实际应用 #
6.1 日志文件处理 #
typescript
async function* readLogLines(path: string): AsyncGenerator<string> {
const file = await Deno.open(path);
const reader = file.readable
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
yield line;
}
}
if (buffer) {
yield buffer;
}
} finally {
reader.releaseLock();
file.close();
}
}
async function processLogs(path: string) {
for await (const line of readLogLines(path)) {
if (line.includes("ERROR")) {
console.log(line);
}
}
}
6.2 实时数据处理 #
typescript
async function* watchFile(path: string): AsyncGenerator<string> {
const watcher = Deno.watchFs(path);
for await (const event of watcher) {
if (event.kind === "modify") {
const content = await Deno.readTextFile(path);
yield content;
}
}
}
async function monitorFile(path: string) {
for await (const content of watchFile(path)) {
console.log("文件已更新:", content.length, "字节");
}
}
七、总结 #
本章学习了:
- for await…of语法
- 异步生成器函数
- 异步迭代器实现
- 流处理
- 实际应用场景
下一章,我们将学习并发控制。
最后更新:2026-03-28