RxJS基础 #
一、RxJS概述 #
RxJS是Reactive Extensions for JavaScript的缩写,是一个用于响应式编程的库。
text
传统编程 响应式编程
┌─────────────────┐ ┌─────────────────┐
│ 变量 = 值 │ │ Observable │
│ 单个值 │ │ 数据流 │
│ 同步 │ │ 异步 │
│ 拉取 │ │ 推送 │
└─────────────────┘ └─────────────────┘
二、核心概念 #
2.1 Observable #
Observable是一个可观察对象,表示一个数据流。
typescript
import { Observable } from 'rxjs';
const observable = new Observable<string>(subscriber => {
subscriber.next('Hello');
subscriber.next('World');
subscriber.complete();
});
observable.subscribe({
next: (value) => console.log(value),
complete: () => console.log('完成')
});
2.2 Observer #
Observer是观察者,用于接收Observable发出的数据。
typescript
const observer = {
next: (value: any) => console.log('值:', value),
error: (err: any) => console.error('错误:', err),
complete: () => console.log('完成')
};
observable.subscribe(observer);
2.3 Subscription #
Subscription是订阅对象,用于取消订阅。
typescript
const subscription = observable.subscribe(value => {
console.log(value);
});
subscription.unsubscribe();
三、创建Observable #
3.1 new Observable #
typescript
const observable = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
return () => {
console.log('清理资源');
};
});
3.2 of #
typescript
import { of } from 'rxjs';
const observable = of(1, 2, 3, 4, 5);
observable.subscribe(value => console.log(value));
3.3 from #
typescript
import { from } from 'rxjs';
// 从数组
from([1, 2, 3]).subscribe(console.log);
// 从Promise
from(fetch('/api/data')).subscribe(console.log);
// 从字符串
from('Hello').subscribe(console.log);
3.4 interval #
typescript
import { interval } from 'rxjs';
const observable = interval(1000);
observable.subscribe(value => console.log(value));
3.5 timer #
typescript
import { timer } from 'rxjs';
timer(2000).subscribe(() => console.log('2秒后执行'));
timer(1000, 500).subscribe(value => console.log(value));
3.6 fromEvent #
typescript
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(event => {
console.log('点击事件:', event);
});
fromEvent(document.getElementById('input'), 'input').subscribe(event => {
console.log('输入事件:', event);
});
四、Subject #
4.1 Subject #
Subject既是Observable也是Observer,可以多播。
typescript
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(value => console.log('观察者A:', value));
subject.subscribe(value => console.log('观察者B:', value));
subject.next(1);
subject.next(2);
4.2 BehaviorSubject #
BehaviorSubject有初始值,会记住最新的值。
typescript
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject<number>(0);
subject.subscribe(value => console.log('观察者A:', value));
subject.next(1);
subject.next(2);
subject.subscribe(value => console.log('观察者B:', value));
4.3 ReplaySubject #
ReplaySubject会重放指定数量的历史值。
typescript
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject<number>(2);
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(value => console.log('新观察者:', value));
4.4 AsyncSubject #
AsyncSubject只在完成时发出最后一个值。
typescript
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject<number>();
subject.subscribe(value => console.log('值:', value));
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
五、常用操作符 #
5.1 map #
typescript
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3)
.pipe(map(x => x * 2))
.subscribe(console.log);
5.2 filter #
typescript
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(filter(x => x % 2 === 0))
.subscribe(console.log);
5.3 tap #
typescript
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
of(1, 2, 3)
.pipe(
tap(x => console.log('原始值:', x)),
map(x => x * 2),
tap(x => console.log('处理后:', x))
)
.subscribe(console.log);
5.4 catchError #
typescript
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
throwError(() => new Error('出错了'))
.pipe(
catchError(error => {
console.error('捕获错误:', error);
return of('默认值');
})
)
.subscribe(console.log);
5.5 retry #
typescript
import { of, throwError } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';
throwError(() => new Error('出错了'))
.pipe(
retry(3),
catchError(error => of('失败'))
)
.subscribe(console.log);
5.6 debounceTime #
typescript
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';
fromEvent(document.getElementById('search'), 'input')
.pipe(
debounceTime(300),
map((event: any) => event.target.value)
)
.subscribe(value => console.log('搜索:', value));
5.7 distinctUntilChanged #
typescript
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 3, 3, 1, 1)
.pipe(distinctUntilChanged())
.subscribe(console.log);
5.8 switchMap #
typescript
import { fromEvent } from 'rxjs';
import { debounceTime, switchMap } from 'rxjs/operators';
fromEvent(document.getElementById('search'), 'input')
.pipe(
debounceTime(300),
switchMap((event: any) =>
fetch(`/api/search?q=${event.target.value}`)
)
)
.subscribe(result => console.log(result));
5.9 mergeMap #
typescript
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
of(1, 2, 3)
.pipe(
mergeMap(x => of(x * 2))
)
.subscribe(console.log);
5.10 combineLatest #
typescript
import { combineLatest, of } from 'rxjs';
const obs1 = of('a', 'b', 'c');
const obs2 = of(1, 2, 3);
combineLatest([obs1, obs2]).subscribe(console.log);
5.11 forkJoin #
typescript
import { forkJoin, of } from 'rxjs';
forkJoin({
users: of(['user1', 'user2']),
products: of(['product1', 'product2'])
}).subscribe(result => {
console.log(result.users);
console.log(result.products);
});
5.12 take #
typescript
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
interval(1000)
.pipe(take(3))
.subscribe(console.log);
5.13 takeUntil #
typescript
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
const destroy$ = new Subject<void>();
interval(1000)
.pipe(takeUntil(destroy$))
.subscribe(console.log);
setTimeout(() => {
destroy$.next();
destroy$.complete();
}, 5000);
六、管道操作 #
6.1 pipe #
typescript
import { of } from 'rxjs';
import { pipe, map, filter } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5);
source.pipe(
filter(x => x % 2 === 0),
map(x => x * 2),
map(x => `值: ${x}`)
).subscribe(console.log);
6.2 自定义操作符 #
typescript
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
function log<T>(prefix: string) {
return (source: Observable<T>) => {
return source.pipe(
tap(value => console.log(`${prefix}:`, value))
);
};
}
of(1, 2, 3)
.pipe(
log('原始'),
map(x => x * 2),
log('处理后')
)
.subscribe();
七、错误处理 #
7.1 catchError #
typescript
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
throwError(() => new Error('错误'))
.pipe(
catchError(error => {
return of('恢复值');
})
)
.subscribe(console.log);
7.2 throwError #
typescript
import { throwError } from 'rxjs';
throwError(() => new Error('出错了'))
.subscribe({
next: () => {},
error: (err) => console.error(err)
});
7.3 finalize #
typescript
import { of } from 'rxjs';
import { finalize } from 'rxjs/operators';
of(1, 2, 3)
.pipe(
finalize(() => console.log('清理资源'))
)
.subscribe(console.log);
八、Angular中使用RxJS #
8.1 HTTP请求 #
typescript
import { HttpClient } from '@angular/common/http';
import { catchError, retry } from 'rxjs/operators';
@Injectable({ providedIn: 'root' })
export class UserService {
constructor(private http: HttpClient) {}
getUsers() {
return this.http.get<User[]>('/api/users').pipe(
retry(3),
catchError(error => {
console.error(error);
return [];
})
);
}
}
8.2 组件中订阅 #
typescript
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
@Component({
selector: 'app-user-list',
template: '...'
})
export class UserListComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
constructor(private userService: UserService) {}
ngOnInit() {
this.userService.getUsers()
.pipe(takeUntil(this.destroy$))
.subscribe(users => {
this.users = users;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
8.3 使用async管道 #
typescript
import { Component } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-user-list',
template: `
<ul>
<li *ngFor="let user of users$ | async">
{{ user.name }}
</li>
</ul>
`
})
export class UserListComponent {
users$: Observable<User[]>;
constructor(private userService: UserService) {
this.users$ = this.userService.getUsers();
}
}
九、总结 #
| 概念 | 说明 |
|---|---|
Observable |
可观察对象,数据流 |
Observer |
观察者,接收数据 |
Subscription |
订阅对象,取消订阅 |
Subject |
既是Observable也是Observer |
BehaviorSubject |
有初始值的Subject |
ReplaySubject |
重放历史值的Subject |
| 操作符 | 处理数据流的函数 |
下一步:NgRx状态管理
最后更新:2026-03-26