流处理 #
一、流概述 #
流(Stream)是处理数据的一种方式,允许逐块处理数据,而不是一次性加载全部数据。Deno实现了Web Streams API。
二、ReadableStream #
2.1 创建可读流 #
typescript
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("Hello");
controller.enqueue("World");
controller.close();
}
});
const reader = readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
2.2 从数组创建 #
typescript
function fromArray<T>(items: T[]): ReadableStream<T> {
return new ReadableStream<T>({
start(controller) {
for (const item of items) {
controller.enqueue(item);
}
controller.close();
}
});
}
const stream = fromArray([1, 2, 3, 4, 5]);
for await (const value of stream) {
console.log(value);
}
2.3 异步生成器 #
typescript
function fromAsyncGenerator<T>(generator: AsyncGenerator<T>): ReadableStream<T> {
return new ReadableStream<T>({
async pull(controller) {
const { done, value } = await generator.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
}
});
}
2.4 从文件创建 #
typescript
const file = await Deno.open("./large-file.txt");
const readable = file.readable;
// 处理流
for await (const chunk of readable) {
console.log(new TextDecoder().decode(chunk));
}
file.close();
三、WritableStream #
3.1 创建可写流 #
typescript
const writable = new WritableStream<string>({
write(chunk) {
console.log("Writing:", chunk);
},
close() {
console.log("Stream closed");
}
});
const writer = writable.getWriter();
await writer.write("Hello");
await writer.write("World");
await writer.close();
3.2 写入文件 #
typescript
const file = await Deno.open("./output.txt", {
write: true,
create: true
});
const writable = new WritableStream<Uint8Array>({
write(chunk) {
return file.write(chunk);
},
close() {
file.close();
}
});
const writer = writable.getWriter();
await writer.write(new TextEncoder().encode("Hello, Deno!"));
await writer.close();
四、TransformStream #
4.1 创建转换流 #
typescript
const transform = new TransformStream<string, string>({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
}
});
const transformed = readable.pipeThrough(transform);
for await (const value of transformed) {
console.log(value); // HELLO, WORLD
}
4.2 文本解码器 #
typescript
const decoder = new TextDecoderStream();
const readable = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode("Hello"));
controller.enqueue(new TextEncoder().encode("World"));
controller.close();
}
});
const decoded = readable.pipeThrough(decoder);
for await (const value of decoded) {
console.log(value);
}
4.3 JSON解析器 #
typescript
function createJsonParser<T>(): TransformStream<string, T> {
let buffer = "";
return new TransformStream<string, T>({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.trim()) {
controller.enqueue(JSON.parse(line));
}
}
},
flush(controller) {
if (buffer.trim()) {
controller.enqueue(JSON.parse(buffer));
}
}
});
}
五、管道操作 #
5.1 pipeTo #
typescript
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("Hello");
controller.enqueue("World");
controller.close();
}
});
const writable = new WritableStream<string>({
write(chunk) {
console.log(chunk);
}
});
await readable.pipeTo(writable);
5.2 pipeThrough #
typescript
const upperCase = new TransformStream<string, string>({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello");
controller.close();
}
});
const transformed = readable.pipeThrough(upperCase);
for await (const value of transformed) {
console.log(value); // HELLO
}
5.3 链式管道 #
typescript
const upperCase = new TransformStream<string, string>({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const addExclamation = new TransformStream<string, string>({
transform(chunk, controller) {
controller.enqueue(chunk + "!");
}
});
const readable = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello");
controller.close();
}
});
const result = readable
.pipeThrough(upperCase)
.pipeThrough(addExclamation);
for await (const value of result) {
console.log(value); // HELLO!
}
六、实际应用 #
6.1 文件复制 #
typescript
async function copyFile(src: string, dest: string): Promise<void> {
const source = await Deno.open(src);
const destination = await Deno.open(dest, {
write: true,
create: true
});
await source.readable.pipeTo(destination.writable);
}
await copyFile("./input.txt", "./output.txt");
6.2 HTTP响应流 #
typescript
import { serve } from "https://deno.land/std@0.208.0/http/server.ts";
serve(async (request) => {
const file = await Deno.open("./large-file.txt");
return new Response(file.readable.pipeThrough(new TextDecoderStream()));
});
6.3 日志处理 #
typescript
async function processLogs(inputPath: string, outputPath: string) {
const input = await Deno.open(inputPath);
const output = await Deno.open(outputPath, {
write: true,
create: true
});
const filterErrors = new TransformStream<string, string>({
transform(line, controller) {
if (line.includes("ERROR")) {
controller.enqueue(line);
}
}
});
await input.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(filterErrors)
.pipeThrough(new TextEncoderStream())
.pipeTo(output.writable);
}
七、总结 #
本章学习了:
- ReadableStream创建和使用
- WritableStream创建和使用
- TransformStream数据转换
- 管道操作
- 实际应用场景
下一章,我们将学习网络请求。
最后更新:2026-03-28