流处理 #

一、流概述 #

流(Stream)是处理数据的一种方式,允许逐块处理数据,而不是一次性加载全部数据。Deno实现了Web Streams API。

二、ReadableStream #

2.1 创建可读流 #

typescript
const readable = new ReadableStream<string>({
  start(controller) {
    controller.enqueue("Hello");
    controller.enqueue("World");
    controller.close();
  }
});

const reader = readable.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value);
}

2.2 从数组创建 #

typescript
function fromArray<T>(items: T[]): ReadableStream<T> {
  return new ReadableStream<T>({
    start(controller) {
      for (const item of items) {
        controller.enqueue(item);
      }
      controller.close();
    }
  });
}

const stream = fromArray([1, 2, 3, 4, 5]);

for await (const value of stream) {
  console.log(value);
}

2.3 异步生成器 #

typescript
function fromAsyncGenerator<T>(generator: AsyncGenerator<T>): ReadableStream<T> {
  return new ReadableStream<T>({
    async pull(controller) {
      const { done, value } = await generator.next();
      if (done) {
        controller.close();
      } else {
        controller.enqueue(value);
      }
    }
  });
}

2.4 从文件创建 #

typescript
const file = await Deno.open("./large-file.txt");
const readable = file.readable;

// 处理流
for await (const chunk of readable) {
  console.log(new TextDecoder().decode(chunk));
}

file.close();

三、WritableStream #

3.1 创建可写流 #

typescript
const writable = new WritableStream<string>({
  write(chunk) {
    console.log("Writing:", chunk);
  },
  close() {
    console.log("Stream closed");
  }
});

const writer = writable.getWriter();
await writer.write("Hello");
await writer.write("World");
await writer.close();

3.2 写入文件 #

typescript
const file = await Deno.open("./output.txt", {
  write: true,
  create: true
});

const writable = new WritableStream<Uint8Array>({
  write(chunk) {
    return file.write(chunk);
  },
  close() {
    file.close();
  }
});

const writer = writable.getWriter();
await writer.write(new TextEncoder().encode("Hello, Deno!"));
await writer.close();

四、TransformStream #

4.1 创建转换流 #

typescript
const transform = new TransformStream<string, string>({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

const readable = new ReadableStream<string>({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  }
});

const transformed = readable.pipeThrough(transform);

for await (const value of transformed) {
  console.log(value); // HELLO, WORLD
}

4.2 文本解码器 #

typescript
const decoder = new TextDecoderStream();

const readable = new ReadableStream<Uint8Array>({
  start(controller) {
    controller.enqueue(new TextEncoder().encode("Hello"));
    controller.enqueue(new TextEncoder().encode("World"));
    controller.close();
  }
});

const decoded = readable.pipeThrough(decoder);

for await (const value of decoded) {
  console.log(value);
}

4.3 JSON解析器 #

typescript
function createJsonParser<T>(): TransformStream<string, T> {
  let buffer = "";
  
  return new TransformStream<string, T>({
    transform(chunk, controller) {
      buffer += chunk;
      
      const lines = buffer.split("\n");
      buffer = lines.pop() || "";
      
      for (const line of lines) {
        if (line.trim()) {
          controller.enqueue(JSON.parse(line));
        }
      }
    },
    flush(controller) {
      if (buffer.trim()) {
        controller.enqueue(JSON.parse(buffer));
      }
    }
  });
}

五、管道操作 #

5.1 pipeTo #

typescript
const readable = new ReadableStream<string>({
  start(controller) {
    controller.enqueue("Hello");
    controller.enqueue("World");
    controller.close();
  }
});

const writable = new WritableStream<string>({
  write(chunk) {
    console.log(chunk);
  }
});

await readable.pipeTo(writable);

5.2 pipeThrough #

typescript
const upperCase = new TransformStream<string, string>({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

const readable = new ReadableStream<string>({
  start(controller) {
    controller.enqueue("hello");
    controller.close();
  }
});

const transformed = readable.pipeThrough(upperCase);

for await (const value of transformed) {
  console.log(value); // HELLO
}

5.3 链式管道 #

typescript
const upperCase = new TransformStream<string, string>({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

const addExclamation = new TransformStream<string, string>({
  transform(chunk, controller) {
    controller.enqueue(chunk + "!");
  }
});

const readable = new ReadableStream<string>({
  start(controller) {
    controller.enqueue("hello");
    controller.close();
  }
});

const result = readable
  .pipeThrough(upperCase)
  .pipeThrough(addExclamation);

for await (const value of result) {
  console.log(value); // HELLO!
}

六、实际应用 #

6.1 文件复制 #

typescript
async function copyFile(src: string, dest: string): Promise<void> {
  const source = await Deno.open(src);
  const destination = await Deno.open(dest, {
    write: true,
    create: true
  });
  
  await source.readable.pipeTo(destination.writable);
}

await copyFile("./input.txt", "./output.txt");

6.2 HTTP响应流 #

typescript
import { serve } from "https://deno.land/std@0.208.0/http/server.ts";

serve(async (request) => {
  const file = await Deno.open("./large-file.txt");
  
  return new Response(file.readable.pipeThrough(new TextDecoderStream()));
});

6.3 日志处理 #

typescript
async function processLogs(inputPath: string, outputPath: string) {
  const input = await Deno.open(inputPath);
  const output = await Deno.open(outputPath, {
    write: true,
    create: true
  });
  
  const filterErrors = new TransformStream<string, string>({
    transform(line, controller) {
      if (line.includes("ERROR")) {
        controller.enqueue(line);
      }
    }
  });
  
  await input.readable
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(filterErrors)
    .pipeThrough(new TextEncoderStream())
    .pipeTo(output.writable);
}

七、总结 #

本章学习了:

  • ReadableStream创建和使用
  • WritableStream创建和使用
  • TransformStream数据转换
  • 管道操作
  • 实际应用场景

下一章,我们将学习网络请求。

最后更新:2026-03-28