异步迭代 #

一、异步迭代概述 #

异步迭代允许你遍历异步产生的数据序列,使用 for await...of 语法和异步生成器函数。

二、for await…of #

2.1 基本语法 #

typescript
async function processStream() {
  const stream = Deno.stdin.readable;
  
  for await (const chunk of stream) {
    console.log(new TextDecoder().decode(chunk));
  }
}

2.2 遍历异步可迭代对象 #

typescript
async function* asyncGenerator() {
  yield 1;
  yield 2;
  yield 3;
}

async function example() {
  for await (const value of asyncGenerator()) {
    console.log(value); // 1, 2, 3
  }
}

2.3 读取文件行 #

typescript
import { readLines } from "https://deno.land/std@0.208.0/io/mod.ts";

async function readFileLines(path: string) {
  const file = await Deno.open(path);
  
  for await (const line of readLines(file)) {
    console.log(line);
  }
  
  file.close();
}

三、异步生成器 #

3.1 基本语法 #

typescript
async function* asyncGenerator() {
  yield await Promise.resolve(1);
  yield await Promise.resolve(2);
  yield await Promise.resolve(3);
}

3.2 生成异步序列 #

typescript
async function* countdown(from: number) {
  while (from > 0) {
    await delay(1000);
    yield from--;
  }
}

async function example() {
  for await (const num of countdown(5)) {
    console.log(num); // 5, 4, 3, 2, 1 (每秒一个)
  }
}

function delay(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

3.3 分页数据 #

typescript
interface Page<T> {
  items: T[];
  nextCursor?: string;
}

async function* paginate<T>(
  fetchPage: (cursor?: string) => Promise<Page<T>>
): AsyncGenerator<T> {
  let cursor: string | undefined;
  
  do {
    const page = await fetchPage(cursor);
    for (const item of page.items) {
      yield item;
    }
    cursor = page.nextCursor;
  } while (cursor);
}

async function fetchUsers(cursor?: string): Promise<Page<User>> {
  const url = cursor 
    ? `/api/users?cursor=${cursor}` 
    : "/api/users";
  const response = await fetch(url);
  return response.json();
}

async function getAllUsers() {
  const users: User[] = [];
  
  for await (const user of paginate(fetchUsers)) {
    users.push(user);
  }
  
  return users;
}

四、异步迭代器 #

4.1 实现异步迭代器 #

typescript
class AsyncRange {
  constructor(
    private start: number,
    private end: number,
    private delay: number = 0
  ) {}
  
  [Symbol.asyncIterator](): AsyncIterator<number> {
    let current = this.start;
    
    return {
      next: async () => {
        if (current <= this.end) {
          if (this.delay > 0) {
            await new Promise(resolve => setTimeout(resolve, this.delay));
          }
          return { done: false, value: current++ };
        }
        return { done: true, value: undefined };
      }
    };
  }
}

async function example() {
  const range = new AsyncRange(1, 5, 1000);
  
  for await (const num of range) {
    console.log(num); // 1, 2, 3, 4, 5 (每秒一个)
  }
}

4.2 可迭代异步生成器 #

typescript
class AsyncQueue<T> {
  private queue: T[] = [];
  private resolvers: Array<(value: IteratorResult<T>) => void> = [];
  
  push(value: T): void {
    if (this.resolvers.length > 0) {
      const resolve = this.resolvers.shift()!;
      resolve({ done: false, value });
    } else {
      this.queue.push(value);
    }
  }
  
  end(): void {
    for (const resolve of this.resolvers) {
      resolve({ done: true, value: undefined });
    }
    this.resolvers = [];
  }
  
  [Symbol.asyncIterator](): AsyncIterator<T> {
    return {
      next: (): Promise<IteratorResult<T>> => {
        if (this.queue.length > 0) {
          return Promise.resolve({ done: false, value: this.queue.shift()! });
        }
        return new Promise(resolve => this.resolvers.push(resolve));
      }
    };
  }
}

async function example() {
  const queue = new AsyncQueue<number>();
  
  // 生产者
  (async () => {
    for (let i = 0; i < 5; i++) {
      await delay(100);
      queue.push(i);
    }
    queue.end();
  })();
  
  // 消费者
  for await (const value of queue) {
    console.log(value); // 0, 1, 2, 3, 4
  }
}

五、流处理 #

5.1 处理ReadableStream #

typescript
async function processStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  const decoder = new TextDecoder();
  
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      console.log(decoder.decode(value));
    }
  } finally {
    reader.releaseLock();
  }
}

5.2 转换流 #

typescript
async function* transformStream(
  stream: ReadableStream<Uint8Array>,
  transformer: (chunk: string) => string
): AsyncGenerator<string> {
  const reader = stream.getReader();
  const decoder = new TextDecoder();
  
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      yield transformer(decoder.decode(value));
    }
  } finally {
    reader.releaseLock();
  }
}

六、实际应用 #

6.1 日志文件处理 #

typescript
async function* readLogLines(path: string): AsyncGenerator<string> {
  const file = await Deno.open(path);
  const reader = file.readable
    .pipeThrough(new TextDecoderStream())
    .getReader();
  
  let buffer = "";
  
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      buffer += value;
      const lines = buffer.split("\n");
      buffer = lines.pop() || "";
      
      for (const line of lines) {
        yield line;
      }
    }
    
    if (buffer) {
      yield buffer;
    }
  } finally {
    reader.releaseLock();
    file.close();
  }
}

async function processLogs(path: string) {
  for await (const line of readLogLines(path)) {
    if (line.includes("ERROR")) {
      console.log(line);
    }
  }
}

6.2 实时数据处理 #

typescript
async function* watchFile(path: string): AsyncGenerator<string> {
  const watcher = Deno.watchFs(path);
  
  for await (const event of watcher) {
    if (event.kind === "modify") {
      const content = await Deno.readTextFile(path);
      yield content;
    }
  }
}

async function monitorFile(path: string) {
  for await (const content of watchFile(path)) {
    console.log("文件已更新:", content.length, "字节");
  }
}

七、总结 #

本章学习了:

  • for await…of语法
  • 异步生成器函数
  • 异步迭代器实现
  • 流处理
  • 实际应用场景

下一章,我们将学习并发控制。

最后更新:2026-03-28