并发控制 #
一、并发概述 #
并发控制是管理多个异步任务执行的重要技术,包括并行执行、限制并发数、任务队列等。
二、Promise静态方法 #
2.1 Promise.all #
并行执行所有任务,全部成功才成功:
typescript
async function fetchAll(urls: string[]): Promise<Response[]> {
const promises = urls.map(url => fetch(url));
return Promise.all(promises);
}
const responses = await fetchAll([
"https://api.example.com/users",
"https://api.example.com/posts"
]);
2.2 Promise.allSettled #
等待所有任务完成,无论成功失败:
typescript
const results = await Promise.allSettled([
fetch("/api/users"),
fetch("/api/posts"),
fetch("/api/comments")
]);
results.forEach((result, i) => {
if (result.status === "fulfilled") {
console.log(`请求 ${i} 成功`);
} else {
console.log(`请求 ${i} 失败: ${result.reason}`);
}
});
2.3 Promise.race #
返回最先完成的结果:
typescript
async function fetchWithTimeout(
url: string,
timeout: number
): Promise<Response> {
return Promise.race([
fetch(url),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("Timeout")), timeout)
)
]);
}
2.4 Promise.any #
返回第一个成功的结果:
typescript
async function fetchFastest(urls: string[]): Promise<Response> {
const promises = urls.map(url => fetch(url));
return Promise.any(promises);
}
const response = await fetchFastest([
"https://mirror1.example.com/data",
"https://mirror2.example.com/data",
"https://mirror3.example.com/data"
]);
三、并发限制 #
3.1 限制并发数 #
typescript
async function limitConcurrency<T, R>(
items: T[],
limit: number,
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results: R[] = [];
const executing: Promise<void>[] = [];
for (const [index, item] of items.entries()) {
const promise = fn(item).then(result => {
results[index] = result;
});
executing.push(promise);
if (executing.length >= limit) {
await Promise.race(executing);
executing.splice(
executing.findIndex(p => p === promise),
1
);
}
}
await Promise.all(executing);
return results;
}
// 使用示例
const urls = Array.from({ length: 100 }, (_, i) => `https://api.example.com/${i}`);
const results = await limitConcurrency(urls, 5, async (url) => {
const response = await fetch(url);
return response.json();
});
3.2 并发池 #
typescript
class ConcurrencyPool {
private running = 0;
private queue: Array<() => void> = [];
constructor(private limit: number) {}
async run<T>(fn: () => Promise<T>): Promise<T> {
while (this.running >= this.limit) {
await new Promise<void>(resolve => this.queue.push(resolve));
}
this.running++;
try {
return await fn();
} finally {
this.running--;
const next = this.queue.shift();
if (next) next();
}
}
}
// 使用示例
const pool = new ConcurrencyPool(3);
const results = await Promise.all(
urls.map(url => pool.run(() => fetch(url).then(r => r.json())))
);
四、任务队列 #
4.1 简单队列 #
typescript
class TaskQueue<T> {
private queue: T[] = [];
private processing = false;
constructor(
private processor: (item: T) => Promise<void>,
private concurrency: number = 1
) {}
add(item: T): void {
this.queue.push(item);
this.process();
}
private async process(): Promise<void> {
if (this.processing) return;
this.processing = true;
while (this.queue.length > 0) {
const batch = this.queue.splice(0, this.concurrency);
await Promise.all(batch.map(item => this.processor(item)));
}
this.processing = false;
}
}
4.2 优先级队列 #
typescript
interface PriorityTask<T> {
item: T;
priority: number;
}
class PriorityQueue<T> {
private tasks: PriorityTask<T>[] = [];
add(item: T, priority: number = 0): void {
const task = { item, priority };
const index = this.tasks.findIndex(t => t.priority < priority);
if (index === -1) {
this.tasks.push(task);
} else {
this.tasks.splice(index, 0, task);
}
}
get(): T | undefined {
const task = this.tasks.shift();
return task?.item;
}
get length(): number {
return this.tasks.length;
}
}
五、实际应用 #
5.1 批量请求 #
typescript
async function batchFetch(
urls: string[],
batchSize: number = 5
): Promise<Response[]> {
const results: Response[] = [];
for (let i = 0; i < urls.length; i += batchSize) {
const batch = urls.slice(i, i + batchSize);
const responses = await Promise.all(batch.map(url => fetch(url)));
results.push(...responses);
}
return results;
}
5.2 带重试的并发请求 #
typescript
async function fetchWithRetry(
url: string,
maxRetries: number = 3
): Promise<Response> {
let lastError: Error | undefined;
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch(url);
if (response.ok) return response;
throw new Error(`HTTP ${response.status}`);
} catch (error) {
lastError = error as Error;
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
}
}
throw lastError;
}
async function fetchAllWithRetry(
urls: string[],
concurrency: number = 5
): Promise<Response[]> {
return limitConcurrency(urls, concurrency, fetchWithRetry);
}
5.3 数据处理管道 #
typescript
async function pipeline<T, R>(
items: T[],
concurrency: number,
...stages: Array<(item: T) => Promise<T>>
): Promise<R[]> {
const results: R[] = [];
await limitConcurrency(items, concurrency, async (item) => {
let value: any = item;
for (const stage of stages) {
value = await stage(value);
}
results.push(value);
});
return results;
}
// 使用示例
const processed = await pipeline(
urls,
5,
async (url) => fetch(url),
async (response) => response.json(),
async (data) => transform(data)
);
六、总结 #
本章学习了:
- Promise静态方法的使用
- 并发限制的实现
- 任务队列的设计
- 实际应用场景
下一章,我们将学习文件操作。
最后更新:2026-03-28