并发控制 #

一、并发概述 #

并发控制是管理多个异步任务执行的重要技术,包括并行执行、限制并发数、任务队列等。

二、Promise静态方法 #

2.1 Promise.all #

并行执行所有任务,全部成功才成功:

typescript
async function fetchAll(urls: string[]): Promise<Response[]> {
  const promises = urls.map(url => fetch(url));
  return Promise.all(promises);
}

const responses = await fetchAll([
  "https://api.example.com/users",
  "https://api.example.com/posts"
]);

2.2 Promise.allSettled #

等待所有任务完成,无论成功失败:

typescript
const results = await Promise.allSettled([
  fetch("/api/users"),
  fetch("/api/posts"),
  fetch("/api/comments")
]);

results.forEach((result, i) => {
  if (result.status === "fulfilled") {
    console.log(`请求 ${i} 成功`);
  } else {
    console.log(`请求 ${i} 失败: ${result.reason}`);
  }
});

2.3 Promise.race #

返回最先完成的结果:

typescript
async function fetchWithTimeout(
  url: string,
  timeout: number
): Promise<Response> {
  return Promise.race([
    fetch(url),
    new Promise<never>((_, reject) => 
      setTimeout(() => reject(new Error("Timeout")), timeout)
    )
  ]);
}

2.4 Promise.any #

返回第一个成功的结果:

typescript
async function fetchFastest(urls: string[]): Promise<Response> {
  const promises = urls.map(url => fetch(url));
  return Promise.any(promises);
}

const response = await fetchFastest([
  "https://mirror1.example.com/data",
  "https://mirror2.example.com/data",
  "https://mirror3.example.com/data"
]);

三、并发限制 #

3.1 限制并发数 #

typescript
async function limitConcurrency<T, R>(
  items: T[],
  limit: number,
  fn: (item: T) => Promise<R>
): Promise<R[]> {
  const results: R[] = [];
  const executing: Promise<void>[] = [];
  
  for (const [index, item] of items.entries()) {
    const promise = fn(item).then(result => {
      results[index] = result;
    });
    
    executing.push(promise);
    
    if (executing.length >= limit) {
      await Promise.race(executing);
      executing.splice(
        executing.findIndex(p => p === promise),
        1
      );
    }
  }
  
  await Promise.all(executing);
  return results;
}

// 使用示例
const urls = Array.from({ length: 100 }, (_, i) => `https://api.example.com/${i}`);

const results = await limitConcurrency(urls, 5, async (url) => {
  const response = await fetch(url);
  return response.json();
});

3.2 并发池 #

typescript
class ConcurrencyPool {
  private running = 0;
  private queue: Array<() => void> = [];
  
  constructor(private limit: number) {}
  
  async run<T>(fn: () => Promise<T>): Promise<T> {
    while (this.running >= this.limit) {
      await new Promise<void>(resolve => this.queue.push(resolve));
    }
    
    this.running++;
    
    try {
      return await fn();
    } finally {
      this.running--;
      const next = this.queue.shift();
      if (next) next();
    }
  }
}

// 使用示例
const pool = new ConcurrencyPool(3);

const results = await Promise.all(
  urls.map(url => pool.run(() => fetch(url).then(r => r.json())))
);

四、任务队列 #

4.1 简单队列 #

typescript
class TaskQueue<T> {
  private queue: T[] = [];
  private processing = false;
  
  constructor(
    private processor: (item: T) => Promise<void>,
    private concurrency: number = 1
  ) {}
  
  add(item: T): void {
    this.queue.push(item);
    this.process();
  }
  
  private async process(): Promise<void> {
    if (this.processing) return;
    this.processing = true;
    
    while (this.queue.length > 0) {
      const batch = this.queue.splice(0, this.concurrency);
      await Promise.all(batch.map(item => this.processor(item)));
    }
    
    this.processing = false;
  }
}

4.2 优先级队列 #

typescript
interface PriorityTask<T> {
  item: T;
  priority: number;
}

class PriorityQueue<T> {
  private tasks: PriorityTask<T>[] = [];
  
  add(item: T, priority: number = 0): void {
    const task = { item, priority };
    const index = this.tasks.findIndex(t => t.priority < priority);
    
    if (index === -1) {
      this.tasks.push(task);
    } else {
      this.tasks.splice(index, 0, task);
    }
  }
  
  get(): T | undefined {
    const task = this.tasks.shift();
    return task?.item;
  }
  
  get length(): number {
    return this.tasks.length;
  }
}

五、实际应用 #

5.1 批量请求 #

typescript
async function batchFetch(
  urls: string[],
  batchSize: number = 5
): Promise<Response[]> {
  const results: Response[] = [];
  
  for (let i = 0; i < urls.length; i += batchSize) {
    const batch = urls.slice(i, i + batchSize);
    const responses = await Promise.all(batch.map(url => fetch(url)));
    results.push(...responses);
  }
  
  return results;
}

5.2 带重试的并发请求 #

typescript
async function fetchWithRetry(
  url: string,
  maxRetries: number = 3
): Promise<Response> {
  let lastError: Error | undefined;
  
  for (let i = 0; i < maxRetries; i++) {
    try {
      const response = await fetch(url);
      if (response.ok) return response;
      throw new Error(`HTTP ${response.status}`);
    } catch (error) {
      lastError = error as Error;
      await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
    }
  }
  
  throw lastError;
}

async function fetchAllWithRetry(
  urls: string[],
  concurrency: number = 5
): Promise<Response[]> {
  return limitConcurrency(urls, concurrency, fetchWithRetry);
}

5.3 数据处理管道 #

typescript
async function pipeline<T, R>(
  items: T[],
  concurrency: number,
  ...stages: Array<(item: T) => Promise<T>>
): Promise<R[]> {
  const results: R[] = [];
  
  await limitConcurrency(items, concurrency, async (item) => {
    let value: any = item;
    for (const stage of stages) {
      value = await stage(value);
    }
    results.push(value);
  });
  
  return results;
}

// 使用示例
const processed = await pipeline(
  urls,
  5,
  async (url) => fetch(url),
  async (response) => response.json(),
  async (data) => transform(data)
);

六、总结 #

本章学习了:

  • Promise静态方法的使用
  • 并发限制的实现
  • 任务队列的设计
  • 实际应用场景

下一章,我们将学习文件操作。

最后更新:2026-03-28